You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

一、环境准备

PG版本 >= 9.4

1.修改wal 开启逻辑复制 (需要重启数据库)

编辑 postgresql.conf 文件,配置如下

# 更改wal日志方式为logical方式有minimal、replica 、logical  
wal_level = logical  

# 更改solts最大数量默认值为10flink-cdc默认一张表占用一个slots
max_replication_slots = 20

# 更改wal发送最大进程数默认值为10这个值和上面的solts设置一样
max_wal_senders = 20     

# 中断那些停止活动超过指定毫秒数的复制连接可以适当设置大一点默认60s0表示禁用
wal_sender_timeout = 180s

执行语句检查是否修改成功

SHOW wal_level;

!Pasted image 20250310182619.png

2.发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;
3.更改表的复制标识包含更新和删除的值
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING这可能导致实时同步时丢失更新和删除的数据行信息从而影响同步的准确性
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识为f标识说明设置成功f表示 full否则为 n表示 nothing即复制标识未设置
select relreplident from pg_class where relname='t_user';

二、source配置详解

source {
  Postgres-CDC {
    slot.name = "pgslot01"
    base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF"
    username = "zr"
    password = "root@123"
	database-names = ["bjpt_hekou_v3"]
	schema-names = ["public"]
    table-names = ["bjpt_hekou_v3.public.ms_t_crjry"]
    startup.mode = "initial"
	table-names-config = [
      {
        table = "bjpt_hekou_v3.public.ms_t_crjry"
        primaryKeys = ["wybs"]
      }
    ]
  }
}

三、完整配置示例

env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  Postgres-CDC {
    slot.name = "pgslot01"
    base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF"
    username = "zr"
    password = "root@123"
	database-names = ["bjpt_hekou_v3"]
	schema-names = ["public"]
    table-names = ["bjpt_hekou_v3.public.ms_t_crjry"]
    startup.mode = "initial"
	table-names-config = [
      {
        table = "bjpt_hekou_v3.public.ms_t_crjry"
        primaryKeys = ["wybs"]
      }
    ]
  }
}
sink {
	Doris {
        fenodes = "172.31.51.142:8030"
        query-port = 9030
        username = admin
        password = "6G_FahdUxAh@K"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        database = "bjbj"
        table = "cdc_test_ms_t_crjry_pg"
        sink.enable-2pc = "true"
        sink.enable-delete = "true"
        sink.label-prefix = "cdc_test_ms_t_crjry_pg"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}

四、注意点

  1. 必须指定slot.name名称,并且重启任务时需要注意slot名称是否存在 !Pasted image 20250310183058.png
  2. 重启任务时如果数据库提示 replication slot xxxx is active 查询表查看 active 字段
select * from pg_replication_slots

!Pasted image 20250310183255.png

执行一下语句关闭slot关闭不是删除


五、原理