diff --git a/Pasted image 20250310182619.png b/Pasted image 20250310182619.png new file mode 100644 index 0000000..7c04aea Binary files /dev/null and b/Pasted image 20250310182619.png differ diff --git a/Pasted image 20250310183058.png b/Pasted image 20250310183058.png new file mode 100644 index 0000000..fba57bd Binary files /dev/null and b/Pasted image 20250310183058.png differ diff --git a/Pasted image 20250310183255.png b/Pasted image 20250310183255.png new file mode 100644 index 0000000..6049de4 Binary files /dev/null and b/Pasted image 20250310183255.png differ diff --git a/work常用/中间件/seatunnel/MySql-CDC.md b/work常用/中间件/seatunnel/MySql-CDC.md index 6609221..c06c8b1 100644 --- a/work常用/中间件/seatunnel/MySql-CDC.md +++ b/work常用/中间件/seatunnel/MySql-CDC.md @@ -49,4 +49,50 @@ source {     ]   } } -``` \ No newline at end of file +``` + +### 三、示例 +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" +} +source { + MySQL-CDC { + base-url = "jdbc:mysql://10.2.0.7:3306/bjpt_hekou_v3" + username = "root" + password = "root@123" + database-names = ["bjpt_hekou_v3"] + table-names = ["bjpt_hekou_v3.ms_t_crjry"] + startup.mode = "initial" + schema-changes.enabled = true + server-id = "6500-7500" + table-names-config = [ + { + table = "bjpt_hekou_v3.ms_t_crjry" + primaryKeys = ["WYBS"] + } + ] + } +} +sink { + Doris { + fenodes = "172.31.51.142:8030" + query-port = 9030 + username = admin + password = "6G_FahdUxAh@K" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + database = "bjbj" + table = "cdc_test_ms_t_crjry" + sink.enable-2pc = "true" + sink.enable-delete = "true" + sink.label-prefix = "cdc_test_ms_t_crjry" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` + +### 四、原理 \ No newline at end of file diff --git a/work常用/中间件/seatunnel/Postgresql-CDC.md b/work常用/中间件/seatunnel/Postgresql-CDC.md new file mode 100644 index 0000000..1bda0f7 --- /dev/null +++ b/work常用/中间件/seatunnel/Postgresql-CDC.md @@ -0,0 +1,124 @@ +### 一、环境准备 +PG版本 >= 9.4 +##### 1.修改wal 开启逻辑复制 (需要重启数据库) +编辑 postgresql.conf 文件,配置如下 +```text +# 更改wal日志方式为logical(方式有:minimal、replica 、logical ) +wal_level = logical + +# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots +max_replication_slots = 20 + +# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 +max_wal_senders = 20 + +# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用) +wal_sender_timeout = 180s +``` +执行语句检查是否修改成功 +```sql +SHOW wal_level; +``` +![[Pasted image 20250310182619.png]] + +##### 2.发布表 +```sql +-- 设置发布为true +update pg_publication set puballtables=true where pubname is not null; + +-- 把所有表进行发布 +CREATE PUBLICATION dbz_publication FOR ALL TABLES; + +-- 查询哪些表已经发布 +select * from pg_publication_tables; +``` + +##### 3.更改表的复制标识包含更新和删除的值 +```sql +-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性) +ALTER TABLE t_user REPLICA IDENTITY FULL; + +-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置) +select relreplident from pg_class where relname='t_user'; +``` +### 二、source配置详解 +```text +source { + Postgres-CDC { + slot.name = "pgslot01" + base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF" + username = "zr" + password = "root@123" + database-names = ["bjpt_hekou_v3"] + schema-names = ["public"] + table-names = ["bjpt_hekou_v3.public.ms_t_crjry"] + startup.mode = "initial" + table-names-config = [ + { + table = "bjpt_hekou_v3.public.ms_t_crjry" + primaryKeys = ["wybs"] + } + ] + } +} +``` +### 三、完整配置示例 +```text +env { + parallelism = 1 + job.mode = "STREAMING" +} +source { + Postgres-CDC { + slot.name = "pgslot01" + base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF" + username = "zr" + password = "root@123" + database-names = ["bjpt_hekou_v3"] + schema-names = ["public"] + table-names = ["bjpt_hekou_v3.public.ms_t_crjry"] + startup.mode = "initial" + table-names-config = [ + { + table = "bjpt_hekou_v3.public.ms_t_crjry" + primaryKeys = ["wybs"] + } + ] + } +} +sink { + Doris { + fenodes = "172.31.51.142:8030" + query-port = 9030 + username = admin + password = "6G_FahdUxAh@K" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + database = "bjbj" + table = "cdc_test_ms_t_crjry_pg" + sink.enable-2pc = "true" + sink.enable-delete = "true" + sink.label-prefix = "cdc_test_ms_t_crjry_pg" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` +### 四、注意点 +1. 必须指定slot.name名称,并且重启任务时需要注意slot名称是否存在 +![[Pasted image 20250310183058.png]] +2. 重启任务时如果数据库提示 ‘replication slot xxxx is active’ +查询表查看 active 字段 +```sql +select * from pg_replication_slots +``` +![[Pasted image 20250310183255.png]] + +执行一下语句关闭slot(关闭不是删除) +```sql + +``` + + +### 五、原理 \ No newline at end of file diff --git a/本机环境/开发环境.md b/本机环境/开发环境.md index e853ea7..2a902f0 100644 --- a/本机环境/开发环境.md +++ b/本机环境/开发环境.md @@ -73,6 +73,14 @@ enforce_gtid_consistency = on docker run --restart=unless-stopped -d --name mysql8 -v /g/data/mysql/conf/my.cnf:/etc/mysql/my.cnf -v /g/data/mysql/data:/var/lib/mysql -v /g/data/mysql/log:/var/log/mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root@123 mysql:8.0.41 ``` ++ posgresql 15 用户名:postgres 密码 root@123 +在G盘创建目录:G:\data\postgresql\data +```shell +docker run -d --name pg15 --restart=always -e POSTGRES_PASSWORD=root@123 -e PGDATA=/pgdata -p 5432:5432 -v /g/data/postgresql/data:/pgdata postgres:15.6 + +``` + + + rabbitmq http://localhost:15672 admin/admin123 ```shell //带管理界面的MQ @@ -184,7 +192,7 @@ docker-compose up -d docker run --name seatunnel_client --network seatunnel_seatunnel_network -e ST_DOCKER_MEMBER_LIST=172.16.0.2:5801 -v E:\\tmp\\st\\mysql_cdc_test_2.template:/opt/seatunnel/config/mysql_cdc_test_2.template --rm apache/seatunnel ./bin/seatunnel.sh -c config/mysql_cdc_test_2.template ``` -======= + + mac flink 安装 ```shell