Appearance
PostgreSQL 逻辑复制订阅详解
概述
逻辑复制订阅(Logical Replication Subscription)是 PostgreSQL 逻辑复制架构中的下游组件。订阅者节点通过订阅来接收发布者的数据变更,实现数据的实时同步。
INFO
订阅是逻辑复制的消费端,负责从发布者拉取数据变更并应用到本地数据库。一个订阅者可以同时订阅多个发布,也可以作为其他订阅者的发布者,形成复杂的复制拓扑结构。
核心概念
订阅的工作原理
关键特性
特性 | 说明 | 业务价值 |
---|---|---|
多订阅支持 | 一个订阅者可以有多个订阅 | 支持复杂的数据集成场景 |
双向复制 | 订阅者也可以作为发布者 | 实现双主或多主复制架构 |
表级粒度 | 基于表进行复制控制 | 灵活的数据同步策略 |
模式匹配 | 按表名和列名匹配 | 简化表结构管理 |
复制槽管理
复制槽类型和生命周期
复制槽命名规则
Details
复制槽命名详解 主复制槽: 通常与订阅名称相同,如订阅名为sub1
,则复制槽名为sub1
表同步槽: 自动生成的临时槽,格式为:
pg_%u_sync_%u_%llu
%u
: 订阅 OID%u
: 表 relid%llu
: 系统标识符 sysid
例如:pg_16384_sync_16385_12345678901234567890
复制槽管理场景
场景 1:使用现有复制槽
sql
-- 适用场景:复制槽已经存在,避免重复创建
CREATE SUBSCRIPTION existing_slot_sub
CONNECTION 'host=replica.example.com dbname=production'
PUBLICATION prod_pub
WITH (create_slot = false, slot_name = 'existing_slot');
场景 2:离线创建订阅
sql
-- 适用场景:主库暂时不可用,先创建订阅配置
CREATE SUBSCRIPTION offline_sub
CONNECTION 'host=unavailable.example.com dbname=production'
PUBLICATION prod_pub
WITH (connect = false);
场景 3:订阅迁移保留槽
sql
-- 步骤1:解除槽关联
ALTER SUBSCRIPTION migration_sub SET (slot_name = NONE);
-- 步骤2:删除订阅(保留槽)
DROP SUBSCRIPTION migration_sub;
-- 步骤3:在新环境重新创建订阅
CREATE SUBSCRIPTION migration_sub
CONNECTION 'host=newhost.example.com dbname=production'
PUBLICATION prod_pub
WITH (create_slot = false, slot_name = 'preserved_slot');
实战示例:设置逻辑复制
业务场景设计
假设我们有一个电商系统,需要将主库的核心业务表同步到数据仓库进行分析:
- 主库(发布者): 生产环境 PostgreSQL
- 数据仓库(订阅者): 分析环境 PostgreSQL
- 同步表: 用户表、订单表、产品表
步骤 1:发布者端准备
创建测试表结构
sql
-- 连接到发布者数据库
\c ecommerce_prod
-- 用户表
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'active'
);
-- 订单表
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(user_id),
total_amount DECIMAL(10,2) NOT NULL,
order_status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 产品表
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(50),
is_active BOOLEAN DEFAULT true
);
插入示例数据
sql
-- 插入用户数据
INSERT INTO users (username, email, status) VALUES
('alice', '[email protected]', 'active'),
('bob', '[email protected]', 'active'),
('charlie', '[email protected]', 'inactive');
-- 插入产品数据
INSERT INTO products (product_name, price, category, is_active) VALUES
('Laptop Pro', 1299.99, 'Electronics', true),
('Wireless Mouse', 29.99, 'Electronics', true),
('Office Chair', 199.99, 'Furniture', false);
-- 插入订单数据
INSERT INTO orders (user_id, total_amount, order_status) VALUES
(1, 1329.98, 'completed'),
(2, 29.99, 'pending'),
(1, 199.99, 'shipped');
创建发布
sql
-- 创建全表发布(用于完整数据同步)
CREATE PUBLICATION pub_all_tables
FOR TABLE users, orders, products;
-- 创建仅截断操作发布(用于结构同步)
CREATE PUBLICATION pub_ddl_only
FOR TABLE users, orders, products
WITH (publish = 'truncate');
-- 创建条件发布(仅同步活跃用户)
CREATE PUBLICATION pub_active_users
FOR TABLE users
WHERE (status = 'active');
-- 验证发布创建
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete, pubtruncate
FROM pg_publication;
步骤 2:订阅者端配置
创建相同表结构
sql
-- 连接到订阅者数据库
\c ecommerce_warehouse
-- 创建相同的表结构(可以有额外列)
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'active',
-- 额外的分析字段
last_sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(user_id),
total_amount DECIMAL(10,2) NOT NULL,
order_status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- 额外的分析字段
sync_batch_id INTEGER DEFAULT 1
);
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(50),
is_active BOOLEAN DEFAULT true
);
创建订阅
sql
-- 订阅1:全表同步
CREATE SUBSCRIPTION sub_all_data
CONNECTION 'host=prod-db.example.com port=5432 dbname=ecommerce_prod user=replicator'
PUBLICATION pub_all_tables
WITH (
copy_data = true, -- 复制初始数据
create_slot = true, -- 自动创建复制槽
enabled = true -- 立即启用
);
-- 订阅2:仅活跃用户
CREATE SUBSCRIPTION sub_active_users
CONNECTION 'host=prod-db.example.com port=5432 dbname=ecommerce_prod user=replicator'
PUBLICATION pub_active_users;
-- 验证订阅状态
SELECT subname, subenabled, subconninfo, subslotname
FROM pg_subscription;
步骤 3:验证数据同步
检查初始数据复制
sql
-- 检查数据是否成功复制
SELECT '用户表' as table_name, count(*) as row_count FROM users
UNION ALL
SELECT '订单表', count(*) FROM orders
UNION ALL
SELECT '产品表', count(*) FROM products;
-- 结果示例:
-- table_name | row_count
-- ------------+-----------
-- 用户表 | 3
-- 订单表 | 3
-- 产品表 | 3
测试实时同步
在发布者端执行:
sql
-- 发布者端:插入新数据
INSERT INTO users (username, email, status) VALUES
('david', '[email protected]', 'active');
INSERT INTO orders (user_id, total_amount, order_status) VALUES
(4, 99.99, 'pending');
-- 更新现有数据
UPDATE orders SET order_status = 'completed' WHERE order_id = 2;
在订阅者端验证:
sql
-- 订阅者端:检查数据是否同步
SELECT * FROM users WHERE username = 'david';
SELECT * FROM orders WHERE user_id = 4;
SELECT order_status FROM orders WHERE order_id = 2;
TIP
同步延迟说明逻辑复制通常在几毫秒到几秒内完成同步,具体延迟取决于:
- 网络延迟
- 系统负载
- 事务大小
- 复制槽的 wal_sender 进程性能
高级配置:延迟复制槽创建
业务场景
在生产环境中,可能需要在不影响主库的情况下先配置订阅,然后在合适的时机激活复制。
场景 1:离线配置模式
步骤详解
sql
-- 步骤1:创建离线订阅
CREATE SUBSCRIPTION maintenance_sub
CONNECTION 'host=maintenance-db.example.com dbname=production'
PUBLICATION maintenance_pub
WITH (connect = false); -- 不立即连接
-- 系统提示:
-- WARNING: subscription was created, but is not connected
-- HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
sql
-- 步骤2:在发布者端手动创建复制槽
-- (在maintenance-db.example.com上执行)
SELECT * FROM pg_create_logical_replication_slot('maintenance_sub', 'pgoutput');
-- 输出示例:
-- slot_name | lsn
-- ---------------+-----------
-- maintenance_sub| 0/19404D0
sql
-- 步骤3:激活订阅(在维护窗口期间)
ALTER SUBSCRIPTION maintenance_sub ENABLE;
ALTER SUBSCRIPTION maintenance_sub REFRESH PUBLICATION;
场景 2:自定义槽名模式
当需要更好的槽管理策略时:
sql
-- 步骤1:创建带自定义槽名的订阅
CREATE SUBSCRIPTION analytics_sub
CONNECTION 'host=prod-db.example.com dbname=ecommerce'
PUBLICATION analytics_pub
WITH (
connect = false,
slot_name = 'analytics_repl_slot_v2' -- 自定义槽名
);
sql
-- 步骤2:创建对应的复制槽
SELECT * FROM pg_create_logical_replication_slot('analytics_repl_slot_v2', 'pgoutput');
sql
-- 步骤3:激活订阅
ALTER SUBSCRIPTION analytics_sub ENABLE;
ALTER SUBSCRIPTION analytics_sub REFRESH PUBLICATION;
场景 3:槽预留模式
用于订阅迁移或故障恢复:
sql
-- 步骤1:创建无槽订阅
CREATE SUBSCRIPTION backup_sub
CONNECTION 'host=backup-db.example.com dbname=production'
PUBLICATION backup_pub
WITH (
slot_name = NONE, -- 无关联槽
enabled = false, -- 禁用状态
create_slot = false -- 不创建槽
);
sql
-- 步骤2:手动创建独立复制槽
SELECT * FROM pg_create_logical_replication_slot('backup_dedicated_slot', 'pgoutput');
sql
-- 步骤3:关联槽并激活
ALTER SUBSCRIPTION backup_sub SET (slot_name = 'backup_dedicated_slot');
ALTER SUBSCRIPTION backup_sub ENABLE;
ALTER SUBSCRIPTION backup_sub REFRESH PUBLICATION;
监控和故障排除
订阅状态监控
sql
-- 查看订阅详细状态
SELECT
s.subname AS subscription_name,
s.subenabled AS enabled,
s.subconninfo AS connection_info,
s.subslotname AS slot_name,
st.pid AS worker_pid,
st.relid AS table_oid,
st.received_lsn,
st.last_msg_send_time,
st.last_msg_receipt_time,
st.latest_end_lsn,
st.latest_end_time
FROM pg_subscription s
LEFT JOIN pg_stat_subscription st ON s.oid = st.subid;
复制槽状态检查
sql
-- 在发布者端检查复制槽
SELECT
slot_name,
plugin,
slot_type,
datoid,
database,
active,
active_pid,
xmin,
catalog_xmin,
restart_lsn,
confirmed_flush_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';
常见问题和解决方案
WARNING
常见故障场景
问题 1:订阅无法连接
- 症状:
could not connect to the publisher
- 解决: 检查网络连接、防火墙配置、用户权限
问题 2:复制槽堆积
- 症状: WAL 文件不断增长
- 解决: 检查订阅者状态,清理无用复制槽
问题 3:表结构不匹配
- 症状:
column "xxx" not found
- 解决: 确保订阅者表包含发布者的所有列
性能优化建议
优化项 | 建议 | 实现方法 |
---|---|---|
连接池 | 复用连接减少开销 | 配置合适的max_replication_slots |
批量提交 | 减少事务频率 | 调整max_sync_workers_per_subscription |
网络优化 | 压缩传输数据 | 在连接字符串中添加压缩参数 |
索引策略 | 优化目标表索引 | 在订阅者端创建适当索引 |
最佳实践总结
设计原则
- 命名规范: 使用有意义的订阅和槽名称
- 权限最小化: 为复制用户分配最小必要权限
- 监控完善: 建立完整的监控和告警机制
- 测试充分: 在生产环境前充分测试复制逻辑
运维要点
INFO
生产环境建议
- 定期监控复制延迟和槽状态
- 为复制槽设置合理的 WAL 保留策略
- 建立订阅故障恢复预案
- 考虑使用连接池优化复制性能
- 定期清理不需要的复制槽
架构考量
通过合理的订阅配置和监控,PostgreSQL 逻辑复制可以为企业提供可靠、灵活的数据同步解决方案,支持从简单的主从复制到复杂的多向同步架构。