### 一、环境准备 PG版本 >= 9.4 ##### 1.修改wal 开启逻辑复制 (需要重启数据库) 编辑 postgresql.conf 文件,配置如下 ```text # 更改wal日志方式为logical(方式有:minimal、replica 、logical ) wal_level = logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 max_wal_senders = 20 # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用) wal_sender_timeout = 180s ``` 执行语句检查是否修改成功 ```sql SHOW wal_level; ``` ![[Pasted image 20250310182619.png]] ##### 2.发布表 ```sql -- 设置发布为true update pg_publication set puballtables=true where pubname is not null; -- 把所有表进行发布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查询哪些表已经发布 select * from pg_publication_tables; ``` ##### 3.更改表的复制标识包含更新和删除的值 ```sql -- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性) ALTER TABLE t_user REPLICA IDENTITY FULL; -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置) select relreplident from pg_class where relname='t_user'; ``` ### 二、source配置详解 ```text source { Postgres-CDC { slot.name = "pgslot01" base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF" username = "zr" password = "root@123" database-names = ["bjpt_hekou_v3"] schema-names = ["public"] table-names = ["bjpt_hekou_v3.public.ms_t_crjry"] startup.mode = "initial" table-names-config = [ { table = "bjpt_hekou_v3.public.ms_t_crjry" primaryKeys = ["wybs"] } ] } } ``` ### 三、完整配置示例 ```text env { parallelism = 1 job.mode = "STREAMING" } source { Postgres-CDC { slot.name = "pgslot01" base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF" username = "zr" password = "root@123" database-names = ["bjpt_hekou_v3"] schema-names = ["public"] table-names = ["bjpt_hekou_v3.public.ms_t_crjry"] startup.mode = "initial" table-names-config = [ { table = "bjpt_hekou_v3.public.ms_t_crjry" primaryKeys = ["wybs"] } ] } } sink { Doris { fenodes = "172.31.51.142:8030" query-port = 9030 username = admin password = "6G_FahdUxAh@K" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" database = "bjbj" table = "cdc_test_ms_t_crjry_pg" sink.enable-2pc = "true" sink.enable-delete = "true" sink.label-prefix = "cdc_test_ms_t_crjry_pg" doris.config = { format="json" read_json_by_line="true" } } } ``` ### 四、注意点 1. 必须指定slot.name名称,并且重启任务时需要注意slot名称是否存在 ![[Pasted image 20250310183058.png]] 2. 重启任务时如果数据库提示 ‘replication slot xxxx is active’ 查询表查看 active 字段 ```sql select * from pg_replication_slots ``` ![[Pasted image 20250310183255.png]] 执行一下语句关闭slot(关闭不是删除) ```sql ``` ### 五、原理