You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
7.3 KiB

6 months ago
### 一、环境准备
PG版本 >= 9.4
##### 1.修改wal 开启逻辑复制 (需要重启数据库)
编辑 postgresql.conf 文件,配置如下
```text
# 更改wal日志方式为logical方式有minimal、replica 、logical
wal_level = logical
# 更改solts最大数量默认值为10flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数默认值为10这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接可以适当设置大一点默认60s0表示禁用
wal_sender_timeout = 180s
```
执行语句检查是否修改成功
```sql
SHOW wal_level;
```
![[Pasted image 20250310182619.png]]
##### 2.发布表
```sql
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
```
##### 3.更改表的复制标识包含更新和删除的值
```sql
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING这可能导致实时同步时丢失更新和删除的数据行信息从而影响同步的准确性
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看复制标识为f标识说明设置成功f表示 full否则为 n表示 nothing即复制标识未设置
select relreplident from pg_class where relname='t_user';
```
### 二、source配置详解
```text
source {
Postgres-CDC {
slot.name = "pgslot01"
base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF"
username = "zr"
password = "root@123"
database-names = ["bjpt_hekou_v3"]
schema-names = ["public"]
table-names = ["bjpt_hekou_v3.public.ms_t_crjry"]
startup.mode = "initial"
table-names-config = [
{
table = "bjpt_hekou_v3.public.ms_t_crjry"
primaryKeys = ["wybs"]
}
]
}
}
```
### 三、完整配置示例
```text
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
Postgres-CDC {
slot.name = "pgslot01"
base-url = "jdbc:postgresql://10.2.0.5:5432/bjpt_hekou_v3?loggerLevel=OFF"
username = "zr"
password = "root@123"
database-names = ["bjpt_hekou_v3"]
schema-names = ["public"]
table-names = ["bjpt_hekou_v3.public.ms_t_crjry"]
startup.mode = "initial"
table-names-config = [
{
table = "bjpt_hekou_v3.public.ms_t_crjry"
primaryKeys = ["wybs"]
}
]
}
}
sink {
Doris {
fenodes = "172.31.51.142:8030"
query-port = 9030
username = admin
password = "6G_FahdUxAh@K"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
database = "bjbj"
table = "cdc_test_ms_t_crjry_pg"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.label-prefix = "cdc_test_ms_t_crjry_pg"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
```
### 四、注意点
1. 必须指定slot.name名称,并且重启任务时需要注意slot名称是否存在
![[Pasted image 20250310183058.png]]
2. 重启任务时如果数据库提示 replication slot xxxx is active
6 months ago
查询表查看 active 字段需要等待一段时间pg会将active改为false状态
6 months ago
```sql
select * from pg_replication_slots
```
![[Pasted image 20250310183255.png]]
6 months ago
还可以选择删除复制槽,但是需要先将目标表数据清除(相当于重新消费)
执行一下语句删除slot
6 months ago
```sql
6 months ago
SELECT * FROM pg_drop_replication_slot('slot_name');
6 months ago
```
6 months ago
### 五、原理
参考文章:[剖析postgresql逻辑复制原理 - 知乎](https://zhuanlan.zhihu.com/p/163204827)
[POSTGRESQL 逻辑复制槽what when how ,check and monitor 这一篇-腾讯云开发者社区-腾讯云](https://cloud.tencent.com/developer/article/1972378)
[PostgreSQL复制槽与WAL文件保护深入解析与实战测试_flzhang_132的技术博客_51CTO博客](https://blog.51cto.com/u_14316134/12869031)
PG 有2种数据复制方式流复制物理复制、逻辑复制。流复制和逻辑复制是对wal日志的不同使用。PG-CDC 是基于逻辑复制实现。
#### 一、流复制
流复制承载了pg主备之间数据同步的功能它的实现方式是将[wal日志](https://zhida.zhihu.com/search?content_id=124401561&content_type=Article&match_order=1&q=wal%E6%97%A5%E5%BF%97&zhida_source=entity)中记录的内容按照确切的块地址逐字节的拷贝到备库因此主备之间数据分布是一致的这意味着在主备机器上同一条记录的ctid是相同的。
![[Pasted image 20250312111136.png]]
#### 二、逻辑复制
逻辑复制同步数据的原理是在wal日志产生的数据库上由逻辑解析模块对wal日志进行初步的解析,它的解析结果为[ReorderBufferChange](https://zhida.zhihu.com/search?content_id=124401561&content_type=Article&match_order=1&q=ReorderBufferChange&zhida_source=entity)可以简单理解为HeapTupleData再由[pgoutput plugin](https://zhida.zhihu.com/search?content_id=124401561&content_type=Article&match_order=1&q=pgoutput+plugin&zhida_source=entity)对中间结果进行过滤(过滤出insert、delete、update操作)和消息化拼接后然后将其发送到订阅端订阅端根据接收到的HeapTupleData重新对其执行insert、delete、update的操作。(**这里要注意流复制是将数据从walrecord拷贝到数据页逻辑复制是将数据重新执行一次insert、update或delete**)。从原理可以看出逻辑复制的效率比流复制低。
![[Pasted image 20250312113648.png]]
##### 2.1 逻辑复制槽 slot
复制槽的作用:
+ 保护WAL文件确保主库在所有的备库收到WAL之前不会移除。
+ 持久化状态信息:复制槽的状态信息是持久化的,即使从库断开或主库重启这些信息也不会丢失
+ 协调主备数据一致性复制槽通过restart_lsn 来标记备库还需要的WAL位置
在逻辑复制中复制槽代表着数据变更流。跟物理复制槽一样逻辑复制槽也可以保证复制异常中断后相关的wal日志不被删除以保证复制重连后仍然可以继续解析wal日志官方解释复制槽提供了一种办法确保主库不会“删除”还未发送到备库的WAL日志也不会删除备库需要的多版本即使备库掉线
一个数据库可以有多个复制槽一个复制槽只有一个outputplugin一个复制槽代表一条复制链路。复制槽本质上是用来管理复制链路的。不同于流复制可以没有复制槽逻辑复制是必须有复制槽的。
6 months ago
6 months ago
##### 2.2 outputplugin
test_decoding这是一个outputplugin样例相当于output plugin原始形态。官方文档说这是一个template但是它仍然可以解析。这个output plugin是pg自带的但需要在contrib中编译。
pgoutput发布订阅模式的默认outputplugin。在发布订阅中walsender进程使用该outputplugin逻辑解码wal日志。
decoder_raw:解析成sql文本格式。这个不是pg自带的。
wal2json这个outputplugin会将wal日志信息转化为json格式