From 838382d4dc94fa59a9e33524f300041ff4a9fde3 Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Thu, 10 Apr 2025 19:06:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- work常用/中间件/seatunnel/MySql-CDC.md | 51 +++++- .../大模型/提示词/seatunnel-cdc.md | 159 ++++++++++++++++++ 2 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 日常学习/大模型/提示词/seatunnel-cdc.md diff --git a/work常用/中间件/seatunnel/MySql-CDC.md b/work常用/中间件/seatunnel/MySql-CDC.md index c06c8b1..74d5790 100644 --- a/work常用/中间件/seatunnel/MySql-CDC.md +++ b/work常用/中间件/seatunnel/MySql-CDC.md @@ -95,4 +95,53 @@ sink { } ``` -### 四、原理 \ No newline at end of file +```text +env { + parallelism = 1 + job.mode = "STREAMING" +} +source { + MySQL-CDC { + base-url = "jdbc:mysql://172.31.51.244:3306/bjpt_hekou_v3" + username = "manager" + password = "manager2!@#" + database-names = ["bjpt_hekou_v3"] + table-names = ["bjpt_hekou_v3.ms_t_crjry","bjpt_hekou_v3.ms_t_crjjtgj","bjpt_hekou_v3.ms_t_crjry_jhw"] + startup.mode = "initial" + schema-changes.enabled = true + server-id = "6500-7500" + table-names-config = [ + { + table = "bjpt_hekou_v3.ms_t_crjry" + primaryKeys = ["WYBS"] + }, + { + table = "bjpt_hekou_v3.ms_t_crjjtgj" + primaryKeys = ["WYBS"] + }, + { + table = "bjpt_hekou_v3.ms_t_crjry_jhw" + primaryKeys = ["WYBS"] + } + ] + } +} +sink { + Doris { + fenodes = "172.31.51.142:8030" + query-port = 9030 + username = admin + password = "6G_FahdUxAh@K" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + database = "bjbj" + table = "${table_name}_cdc_test" + sink.enable-2pc = "true" + sink.enable-delete = "true" + sink.label-prefix = "cdc_test_ms" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` \ No newline at end of file diff --git a/日常学习/大模型/提示词/seatunnel-cdc.md b/日常学习/大模型/提示词/seatunnel-cdc.md new file mode 100644 index 0000000..0d8a43f --- /dev/null +++ b/日常学习/大模型/提示词/seatunnel-cdc.md @@ -0,0 +1,159 @@ +你是ETL专家,精通使用seatunel工具实现实时数据同步,我将告诉你一个任务,然后你会帮我生成seatunnel配置。 + +# seatunel 配置格式示例 +seatunel 使用hocon格式,实时同步配置必须包含3个部分env,source,slink。其中env表示全局环境变量,source表示数据来源,slink表示数据去向 + +例如以下是MySQL-CDC同步到doris的配置 +```text +env { + parallelism = 1 + job.mode = "STREAMING" +} +source { + MySQL-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/test_db" + username = "root" + password = "root@123" + database-names = ["test_db"] + table-names = ["test_db.test_t_crjry","test_db.test_t_crjjtgj","test_db.test_t_crjry_jhw"] + startup.mode = "initial" + schema-changes.enabled = true + server-id = "6500-7500" + table-names-config = [ + { + table = "test_db.test_t_crjry" + primaryKeys = ["ID"] + }, + { + table = "test_db.test_t_crjjtgj" + primaryKeys = ["ID"] + }, + { + table = "test_db.test_t_crjry_jhw" + primaryKeys = ["ID"] + } + ] + } +} +sink { + Doris { + fenodes = "127.0.0.1:8030" + query-port = 9030 + username = root + password = "root@123" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + database = "test_db_1" + table = "${table_name}_cdc_test" + sink.enable-2pc = "true" + sink.enable-delete = "true" + sink.label-prefix = "cdc_test_ms" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` + +# seatunel 配置说明 +## env 示例 +用途:表示全局环境变量 +配置项: + - parallelism:并行度,默认为1 + - job.mode:任务模式,默认为 STREAMING +env { + parallelism = 1 + job.mode = "STREAMING" +} + +## source +用途:数据来源配置,包含MySQL-CDC或者Postgres-CDC,如果没有特别说明MySQL-CDC和Postgres-CDC的配置项相同且均为必填项 +配置项: + - base-url:数据库的jdbcP链接,例如:jdbc:mysql://host:port/database + - username:数据库账号 + - password:数据库密码 + - database-names:数据库名称,有多个 + - table-names: 表名,MySQL-CDC格式为:数据库名.表名,Postgres-CDC格式为:数据库名.模式名称.表名 + - startup.mode: 同步模式,默认 initial + - schema-changes.enabled: schema变更同步,默认 true 。注:MySQL-CDC特有配置 + - server-id: server-id范围,默认 6500-7500。注:MySQL-CDC特有配置 + - table:table-names中的表名,单个,MySQL-CDC格式为:数据库名.表名,Postgres-CDC格式为:数据库名.模式名.表名 + - primaryKeys:表主键,有多个 + - slot.name:同步槽名称,格式为:字母+下划线,长度不超过16位,可以随机生成。注:Postgres-CDC特有配置 + - schema-names:模式名称,默认public,包含多个。注:Postgres-CDC特有配置 + +source { + MySQL-CDC { + base-url = "" + username = "" + password = "" + database-names = [""] + table-names = [""] + startup.mode = "initial" + schema-changes.enabled = true + server-id = "6500-7500" + table-names-config = [ + { + table = "" + primaryKeys = [""] + }, + { + table = "" + primaryKeys = [""] + } + ] + } +} +## slink +用途: 数据去向配置 +配置项: + - fenodes:doris FE节点URL,端口号为8030,例如:127.0.0.1:8030 + - query-port:doris 查询端口号,默认9030 + - username:doris用户名 + - password:doris密码 + - schema_save_mode:schema创建策略,默认CREATE_SCHEMA_WHEN_NOT_EXIST + - database:数据库名 + - table:目标表名,可以使用${table_name}占位符 + - sink.enable-2pc:开启2阶段提交,默认true + - sink.enable-delete:允许删除,默认true + - sink.label-prefix:导入使用的标签前缀,具有唯一性。格式为:格式为:字母+下划线,长度不超过16位,可以随机生成 + - doris.config:doris解析配置,采用默认 +sink { + Doris { + fenodes = "" + query-port = 9030 + username = + password = "" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + database = "" + table = "" + sink.enable-2pc = "true" + sink.enable-delete = "true" + sink.label-prefix = "" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} + +# 规则 +1.禁止使用示例和配置说明以外的配置项 +2.直接输出结果,不需要配置说明或其他信息 +3.必须为Hocon格式 + +我的任务是: +帮我把postgresql的test_t_crjry(主键:ID)、test_t_crjjtgj(主键:PKID)、test_t_crjry_jhw(主键:WYBS)三张表同步到doris,数据库配置信息如下: +postgresql连接信息: +host=172.31.51.244 +端口=5432 +数据库名=test_db +用户名=manager +密码=manager2!@# +模式名称=public + +doris连接信息: +FE节点host=172.31.51.142 +数据库名=bjbj +用户名=admin +密码=6G_FahdUxAh@K \ No newline at end of file