Skip to content

PostgreSQL 逻辑复制订阅详解

概述

逻辑复制订阅(Logical Replication Subscription)是 PostgreSQL 逻辑复制架构中的下游组件。订阅者节点通过订阅来接收发布者的数据变更,实现数据的实时同步。

INFO

订阅是逻辑复制的消费端,负责从发布者拉取数据变更并应用到本地数据库。一个订阅者可以同时订阅多个发布,也可以作为其他订阅者的发布者,形成复杂的复制拓扑结构。

核心概念

订阅的工作原理

关键特性

特性说明业务价值
多订阅支持一个订阅者可以有多个订阅支持复杂的数据集成场景
双向复制订阅者也可以作为发布者实现双主或多主复制架构
表级粒度基于表进行复制控制灵活的数据同步策略
模式匹配按表名和列名匹配简化表结构管理

复制槽管理

复制槽类型和生命周期

复制槽命名规则

Details

复制槽命名详解 主复制槽: 通常与订阅名称相同,如订阅名为sub1,则复制槽名为sub1

表同步槽: 自动生成的临时槽,格式为:

pg_%u_sync_%u_%llu
  • %u: 订阅 OID
  • %u: 表 relid
  • %llu: 系统标识符 sysid

例如:pg_16384_sync_16385_12345678901234567890

复制槽管理场景

场景 1:使用现有复制槽

sql
-- 适用场景:复制槽已经存在,避免重复创建
CREATE SUBSCRIPTION existing_slot_sub
CONNECTION 'host=replica.example.com dbname=production'
PUBLICATION prod_pub
WITH (create_slot = false, slot_name = 'existing_slot');

场景 2:离线创建订阅

sql
-- 适用场景:主库暂时不可用,先创建订阅配置
CREATE SUBSCRIPTION offline_sub
CONNECTION 'host=unavailable.example.com dbname=production'
PUBLICATION prod_pub
WITH (connect = false);

场景 3:订阅迁移保留槽

sql
-- 步骤1:解除槽关联
ALTER SUBSCRIPTION migration_sub SET (slot_name = NONE);

-- 步骤2:删除订阅(保留槽)
DROP SUBSCRIPTION migration_sub;

-- 步骤3:在新环境重新创建订阅
CREATE SUBSCRIPTION migration_sub
CONNECTION 'host=newhost.example.com dbname=production'
PUBLICATION prod_pub
WITH (create_slot = false, slot_name = 'preserved_slot');

实战示例:设置逻辑复制

业务场景设计

假设我们有一个电商系统,需要将主库的核心业务表同步到数据仓库进行分析:

  • 主库(发布者): 生产环境 PostgreSQL
  • 数据仓库(订阅者): 分析环境 PostgreSQL
  • 同步表: 用户表、订单表、产品表

步骤 1:发布者端准备

创建测试表结构

sql
-- 连接到发布者数据库
\c ecommerce_prod

-- 用户表
CREATE TABLE users (
    user_id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(20) DEFAULT 'active'
);

-- 订单表
CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id),
    total_amount DECIMAL(10,2) NOT NULL,
    order_status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 产品表
CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(100) NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    category VARCHAR(50),
    is_active BOOLEAN DEFAULT true
);

插入示例数据

sql
-- 插入用户数据
INSERT INTO users (username, email, status) VALUES
('alice', '[email protected]', 'active'),
('bob', '[email protected]', 'active'),
('charlie', '[email protected]', 'inactive');

-- 插入产品数据
INSERT INTO products (product_name, price, category, is_active) VALUES
('Laptop Pro', 1299.99, 'Electronics', true),
('Wireless Mouse', 29.99, 'Electronics', true),
('Office Chair', 199.99, 'Furniture', false);

-- 插入订单数据
INSERT INTO orders (user_id, total_amount, order_status) VALUES
(1, 1329.98, 'completed'),
(2, 29.99, 'pending'),
(1, 199.99, 'shipped');

创建发布

sql
-- 创建全表发布(用于完整数据同步)
CREATE PUBLICATION pub_all_tables
FOR TABLE users, orders, products;

-- 创建仅截断操作发布(用于结构同步)
CREATE PUBLICATION pub_ddl_only
FOR TABLE users, orders, products
WITH (publish = 'truncate');

-- 创建条件发布(仅同步活跃用户)
CREATE PUBLICATION pub_active_users
FOR TABLE users
WHERE (status = 'active');

-- 验证发布创建
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete, pubtruncate
FROM pg_publication;

步骤 2:订阅者端配置

创建相同表结构

sql
-- 连接到订阅者数据库
\c ecommerce_warehouse

-- 创建相同的表结构(可以有额外列)
CREATE TABLE users (
    user_id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(20) DEFAULT 'active',
    -- 额外的分析字段
    last_sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id),
    total_amount DECIMAL(10,2) NOT NULL,
    order_status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    -- 额外的分析字段
    sync_batch_id INTEGER DEFAULT 1
);

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(100) NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    category VARCHAR(50),
    is_active BOOLEAN DEFAULT true
);

创建订阅

sql
-- 订阅1:全表同步
CREATE SUBSCRIPTION sub_all_data
CONNECTION 'host=prod-db.example.com port=5432 dbname=ecommerce_prod user=replicator'
PUBLICATION pub_all_tables
WITH (
    copy_data = true,  -- 复制初始数据
    create_slot = true, -- 自动创建复制槽
    enabled = true      -- 立即启用
);

-- 订阅2:仅活跃用户
CREATE SUBSCRIPTION sub_active_users
CONNECTION 'host=prod-db.example.com port=5432 dbname=ecommerce_prod user=replicator'
PUBLICATION pub_active_users;

-- 验证订阅状态
SELECT subname, subenabled, subconninfo, subslotname
FROM pg_subscription;

步骤 3:验证数据同步

检查初始数据复制

sql
-- 检查数据是否成功复制
SELECT '用户表' as table_name, count(*) as row_count FROM users
UNION ALL
SELECT '订单表', count(*) FROM orders
UNION ALL
SELECT '产品表', count(*) FROM products;

-- 结果示例:
--  table_name | row_count
-- ------------+-----------
--  用户表     |         3
--  订单表     |         3
--  产品表     |         3

测试实时同步

在发布者端执行:

sql
-- 发布者端:插入新数据
INSERT INTO users (username, email, status) VALUES
('david', '[email protected]', 'active');

INSERT INTO orders (user_id, total_amount, order_status) VALUES
(4, 99.99, 'pending');

-- 更新现有数据
UPDATE orders SET order_status = 'completed' WHERE order_id = 2;

在订阅者端验证:

sql
-- 订阅者端:检查数据是否同步
SELECT * FROM users WHERE username = 'david';
SELECT * FROM orders WHERE user_id = 4;
SELECT order_status FROM orders WHERE order_id = 2;

TIP

同步延迟说明逻辑复制通常在几毫秒到几秒内完成同步,具体延迟取决于:

  • 网络延迟
  • 系统负载
  • 事务大小
  • 复制槽的 wal_sender 进程性能

高级配置:延迟复制槽创建

业务场景

在生产环境中,可能需要在不影响主库的情况下先配置订阅,然后在合适的时机激活复制。

场景 1:离线配置模式

步骤详解

sql
-- 步骤1:创建离线订阅
CREATE SUBSCRIPTION maintenance_sub
CONNECTION 'host=maintenance-db.example.com dbname=production'
PUBLICATION maintenance_pub
WITH (connect = false);  -- 不立即连接

-- 系统提示:
-- WARNING: subscription was created, but is not connected
-- HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
sql
-- 步骤2:在发布者端手动创建复制槽
-- (在maintenance-db.example.com上执行)
SELECT * FROM pg_create_logical_replication_slot('maintenance_sub', 'pgoutput');

-- 输出示例:
--  slot_name     |    lsn
-- ---------------+-----------
--  maintenance_sub| 0/19404D0
sql
-- 步骤3:激活订阅(在维护窗口期间)
ALTER SUBSCRIPTION maintenance_sub ENABLE;
ALTER SUBSCRIPTION maintenance_sub REFRESH PUBLICATION;

场景 2:自定义槽名模式

当需要更好的槽管理策略时:

sql
-- 步骤1:创建带自定义槽名的订阅
CREATE SUBSCRIPTION analytics_sub
CONNECTION 'host=prod-db.example.com dbname=ecommerce'
PUBLICATION analytics_pub
WITH (
    connect = false,
    slot_name = 'analytics_repl_slot_v2'  -- 自定义槽名
);
sql
-- 步骤2:创建对应的复制槽
SELECT * FROM pg_create_logical_replication_slot('analytics_repl_slot_v2', 'pgoutput');
sql
-- 步骤3:激活订阅
ALTER SUBSCRIPTION analytics_sub ENABLE;
ALTER SUBSCRIPTION analytics_sub REFRESH PUBLICATION;

场景 3:槽预留模式

用于订阅迁移或故障恢复:

sql
-- 步骤1:创建无槽订阅
CREATE SUBSCRIPTION backup_sub
CONNECTION 'host=backup-db.example.com dbname=production'
PUBLICATION backup_pub
WITH (
    slot_name = NONE,      -- 无关联槽
    enabled = false,       -- 禁用状态
    create_slot = false    -- 不创建槽
);
sql
-- 步骤2:手动创建独立复制槽
SELECT * FROM pg_create_logical_replication_slot('backup_dedicated_slot', 'pgoutput');
sql
-- 步骤3:关联槽并激活
ALTER SUBSCRIPTION backup_sub SET (slot_name = 'backup_dedicated_slot');
ALTER SUBSCRIPTION backup_sub ENABLE;
ALTER SUBSCRIPTION backup_sub REFRESH PUBLICATION;

监控和故障排除

订阅状态监控

sql
-- 查看订阅详细状态
SELECT
    s.subname AS subscription_name,
    s.subenabled AS enabled,
    s.subconninfo AS connection_info,
    s.subslotname AS slot_name,
    st.pid AS worker_pid,
    st.relid AS table_oid,
    st.received_lsn,
    st.last_msg_send_time,
    st.last_msg_receipt_time,
    st.latest_end_lsn,
    st.latest_end_time
FROM pg_subscription s
LEFT JOIN pg_stat_subscription st ON s.oid = st.subid;

复制槽状态检查

sql
-- 在发布者端检查复制槽
SELECT
    slot_name,
    plugin,
    slot_type,
    datoid,
    database,
    active,
    active_pid,
    xmin,
    catalog_xmin,
    restart_lsn,
    confirmed_flush_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';

常见问题和解决方案

WARNING

常见故障场景

问题 1:订阅无法连接

  • 症状: could not connect to the publisher
  • 解决: 检查网络连接、防火墙配置、用户权限

问题 2:复制槽堆积

  • 症状: WAL 文件不断增长
  • 解决: 检查订阅者状态,清理无用复制槽

问题 3:表结构不匹配

  • 症状: column "xxx" not found
  • 解决: 确保订阅者表包含发布者的所有列

性能优化建议

优化项建议实现方法
连接池复用连接减少开销配置合适的max_replication_slots
批量提交减少事务频率调整max_sync_workers_per_subscription
网络优化压缩传输数据在连接字符串中添加压缩参数
索引策略优化目标表索引在订阅者端创建适当索引

最佳实践总结

设计原则

  1. 命名规范: 使用有意义的订阅和槽名称
  2. 权限最小化: 为复制用户分配最小必要权限
  3. 监控完善: 建立完整的监控和告警机制
  4. 测试充分: 在生产环境前充分测试复制逻辑

运维要点

INFO

生产环境建议

  • 定期监控复制延迟和槽状态
  • 为复制槽设置合理的 WAL 保留策略
  • 建立订阅故障恢复预案
  • 考虑使用连接池优化复制性能
  • 定期清理不需要的复制槽

架构考量

通过合理的订阅配置和监控,PostgreSQL 逻辑复制可以为企业提供可靠、灵活的数据同步解决方案,支持从简单的主从复制到复杂的多向同步架构。