|
|
@ -0,0 +1,159 @@
|
|
|
|
|
|
|
|
你是ETL专家,精通使用seatunel工具实现实时数据同步,我将告诉你一个任务,然后你会帮我生成seatunnel配置。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# seatunel 配置格式示例
|
|
|
|
|
|
|
|
seatunel 使用hocon格式,实时同步配置必须包含3个部分env,source,slink。其中env表示全局环境变量,source表示数据来源,slink表示数据去向
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
例如以下是MySQL-CDC同步到doris的配置
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
|
|
|
env {
|
|
|
|
|
|
|
|
parallelism = 1
|
|
|
|
|
|
|
|
job.mode = "STREAMING"
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
source {
|
|
|
|
|
|
|
|
MySQL-CDC {
|
|
|
|
|
|
|
|
base-url = "jdbc:mysql://127.0.0.1:3306/test_db"
|
|
|
|
|
|
|
|
username = "root"
|
|
|
|
|
|
|
|
password = "root@123"
|
|
|
|
|
|
|
|
database-names = ["test_db"]
|
|
|
|
|
|
|
|
table-names = ["test_db.test_t_crjry","test_db.test_t_crjjtgj","test_db.test_t_crjry_jhw"]
|
|
|
|
|
|
|
|
startup.mode = "initial"
|
|
|
|
|
|
|
|
schema-changes.enabled = true
|
|
|
|
|
|
|
|
server-id = "6500-7500"
|
|
|
|
|
|
|
|
table-names-config = [
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
table = "test_db.test_t_crjry"
|
|
|
|
|
|
|
|
primaryKeys = ["ID"]
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
table = "test_db.test_t_crjjtgj"
|
|
|
|
|
|
|
|
primaryKeys = ["ID"]
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
table = "test_db.test_t_crjry_jhw"
|
|
|
|
|
|
|
|
primaryKeys = ["ID"]
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
sink {
|
|
|
|
|
|
|
|
Doris {
|
|
|
|
|
|
|
|
fenodes = "127.0.0.1:8030"
|
|
|
|
|
|
|
|
query-port = 9030
|
|
|
|
|
|
|
|
username = root
|
|
|
|
|
|
|
|
password = "root@123"
|
|
|
|
|
|
|
|
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
|
|
|
|
|
|
|
|
database = "test_db_1"
|
|
|
|
|
|
|
|
table = "${table_name}_cdc_test"
|
|
|
|
|
|
|
|
sink.enable-2pc = "true"
|
|
|
|
|
|
|
|
sink.enable-delete = "true"
|
|
|
|
|
|
|
|
sink.label-prefix = "cdc_test_ms"
|
|
|
|
|
|
|
|
doris.config = {
|
|
|
|
|
|
|
|
format="json"
|
|
|
|
|
|
|
|
read_json_by_line="true"
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# seatunel 配置说明
|
|
|
|
|
|
|
|
## env 示例
|
|
|
|
|
|
|
|
用途:表示全局环境变量
|
|
|
|
|
|
|
|
配置项:
|
|
|
|
|
|
|
|
- parallelism:并行度,默认为1
|
|
|
|
|
|
|
|
- job.mode:任务模式,默认为 STREAMING
|
|
|
|
|
|
|
|
env {
|
|
|
|
|
|
|
|
parallelism = 1
|
|
|
|
|
|
|
|
job.mode = "STREAMING"
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## source
|
|
|
|
|
|
|
|
用途:数据来源配置,包含MySQL-CDC或者Postgres-CDC,如果没有特别说明MySQL-CDC和Postgres-CDC的配置项相同且均为必填项
|
|
|
|
|
|
|
|
配置项:
|
|
|
|
|
|
|
|
- base-url:数据库的jdbcP链接,例如:jdbc:mysql://host:port/database
|
|
|
|
|
|
|
|
- username:数据库账号
|
|
|
|
|
|
|
|
- password:数据库密码
|
|
|
|
|
|
|
|
- database-names:数据库名称,有多个
|
|
|
|
|
|
|
|
- table-names: 表名,MySQL-CDC格式为:数据库名.表名,Postgres-CDC格式为:数据库名.模式名称.表名
|
|
|
|
|
|
|
|
- startup.mode: 同步模式,默认 initial
|
|
|
|
|
|
|
|
- schema-changes.enabled: schema变更同步,默认 true 。注:MySQL-CDC特有配置
|
|
|
|
|
|
|
|
- server-id: server-id范围,默认 6500-7500。注:MySQL-CDC特有配置
|
|
|
|
|
|
|
|
- table:table-names中的表名,单个,MySQL-CDC格式为:数据库名.表名,Postgres-CDC格式为:数据库名.模式名.表名
|
|
|
|
|
|
|
|
- primaryKeys:表主键,有多个
|
|
|
|
|
|
|
|
- slot.name:同步槽名称,格式为:字母+下划线,长度不超过16位,可以随机生成。注:Postgres-CDC特有配置
|
|
|
|
|
|
|
|
- schema-names:模式名称,默认public,包含多个。注:Postgres-CDC特有配置
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
source {
|
|
|
|
|
|
|
|
MySQL-CDC {
|
|
|
|
|
|
|
|
base-url = ""
|
|
|
|
|
|
|
|
username = ""
|
|
|
|
|
|
|
|
password = ""
|
|
|
|
|
|
|
|
database-names = [""]
|
|
|
|
|
|
|
|
table-names = [""]
|
|
|
|
|
|
|
|
startup.mode = "initial"
|
|
|
|
|
|
|
|
schema-changes.enabled = true
|
|
|
|
|
|
|
|
server-id = "6500-7500"
|
|
|
|
|
|
|
|
table-names-config = [
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
table = ""
|
|
|
|
|
|
|
|
primaryKeys = [""]
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
table = ""
|
|
|
|
|
|
|
|
primaryKeys = [""]
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
## slink
|
|
|
|
|
|
|
|
用途: 数据去向配置
|
|
|
|
|
|
|
|
配置项:
|
|
|
|
|
|
|
|
- fenodes:doris FE节点URL,端口号为8030,例如:127.0.0.1:8030
|
|
|
|
|
|
|
|
- query-port:doris 查询端口号,默认9030
|
|
|
|
|
|
|
|
- username:doris用户名
|
|
|
|
|
|
|
|
- password:doris密码
|
|
|
|
|
|
|
|
- schema_save_mode:schema创建策略,默认CREATE_SCHEMA_WHEN_NOT_EXIST
|
|
|
|
|
|
|
|
- database:数据库名
|
|
|
|
|
|
|
|
- table:目标表名,可以使用${table_name}占位符
|
|
|
|
|
|
|
|
- sink.enable-2pc:开启2阶段提交,默认true
|
|
|
|
|
|
|
|
- sink.enable-delete:允许删除,默认true
|
|
|
|
|
|
|
|
- sink.label-prefix:导入使用的标签前缀,具有唯一性。格式为:格式为:字母+下划线,长度不超过16位,可以随机生成
|
|
|
|
|
|
|
|
- doris.config:doris解析配置,采用默认
|
|
|
|
|
|
|
|
sink {
|
|
|
|
|
|
|
|
Doris {
|
|
|
|
|
|
|
|
fenodes = ""
|
|
|
|
|
|
|
|
query-port = 9030
|
|
|
|
|
|
|
|
username =
|
|
|
|
|
|
|
|
password = ""
|
|
|
|
|
|
|
|
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
|
|
|
|
|
|
|
|
database = ""
|
|
|
|
|
|
|
|
table = ""
|
|
|
|
|
|
|
|
sink.enable-2pc = "true"
|
|
|
|
|
|
|
|
sink.enable-delete = "true"
|
|
|
|
|
|
|
|
sink.label-prefix = ""
|
|
|
|
|
|
|
|
doris.config = {
|
|
|
|
|
|
|
|
format="json"
|
|
|
|
|
|
|
|
read_json_by_line="true"
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 规则
|
|
|
|
|
|
|
|
1.禁止使用示例和配置说明以外的配置项
|
|
|
|
|
|
|
|
2.直接输出结果,不需要配置说明或其他信息
|
|
|
|
|
|
|
|
3.必须为Hocon格式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
我的任务是:
|
|
|
|
|
|
|
|
帮我把postgresql的test_t_crjry(主键:ID)、test_t_crjjtgj(主键:PKID)、test_t_crjry_jhw(主键:WYBS)三张表同步到doris,数据库配置信息如下:
|
|
|
|
|
|
|
|
postgresql连接信息:
|
|
|
|
|
|
|
|
host=172.31.51.244
|
|
|
|
|
|
|
|
端口=5432
|
|
|
|
|
|
|
|
数据库名=test_db
|
|
|
|
|
|
|
|
用户名=manager
|
|
|
|
|
|
|
|
密码=manager2!@#
|
|
|
|
|
|
|
|
模式名称=public
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
doris连接信息:
|
|
|
|
|
|
|
|
FE节点host=172.31.51.142
|
|
|
|
|
|
|
|
数据库名=bjbj
|
|
|
|
|
|
|
|
用户名=admin
|
|
|
|
|
|
|
|
密码=6G_FahdUxAh@K
|