Skip to content

PostgreSQL 逻辑复制 - 发布(Publication)详解

概述

逻辑复制发布(Publication)是 PostgreSQL 逻辑复制系统的核心组件之一,它定义了哪些数据变更需要被复制到订阅者(Subscriber)。发布可以看作是一个"数据变更发布器",负责捕获和传播特定表的数据修改操作。

发布的本质

发布是从一个或一组表中生成的一系列更改,也可以被描述为更改集或复制集。它类似于一个"数据变更的广播频道",订阅者可以"收听"这个频道来获取数据更新。 :::

发布的核心概念

基本特性

> **发布的关键特性**

  • 每个发布只存在于一个数据库中
  • 发布与模式不同,不会影响表的访问方式
  • 每个表可以添加到多个发布中
  • 发布可以有多个订阅者

支持的操作类型

发布可以选择性地复制以下 DML 操作:

操作类型描述默认状态特殊说明
INSERT插入操作✅ 包含无需副本标识
UPDATE更新操作✅ 包含需要副本标识
DELETE删除操作✅ 包含需要副本标识
TRUNCATE截断操作✅ 包含行过滤器无效

DDL 操作不会被复制

发布规范仅适用于 DML 操作,它们不会影响初始数据同步副本。DDL 变更(如 ALTER TABLE、CREATE INDEX 等)不会通过逻辑复制传播。 :::

副本标识(Replica Identity)

概念解析

副本标识是逻辑复制中的关键概念,它决定了如何在订阅者端识别要更新或删除的行。

副本标识类型详解

1. PRIMARY KEY(默认)

示例场景:电商订单表

sql
-- 创建订单表(默认使用主键作为副本标识)
CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    quantity INTEGER NOT NULL,
    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(20) DEFAULT 'pending'
);

-- 查看副本标识设置
SELECT schemaname, tablename, replicaidentity
FROM pg_tables
WHERE tablename = 'orders';

输出分析

 schemaname | tablename | replicaidentity
------------+-----------+-----------------
 public     | orders    | d
  • replicaidentity = 'd' 表示使用默认设置(主键)

2. UNIQUE INDEX

示例场景:用户表使用邮箱作为副本标识

sql
-- 创建用户表
CREATE TABLE users (
    user_id SERIAL,
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 设置唯一索引作为副本标识
ALTER TABLE users REPLICA IDENTITY USING INDEX users_email_key;

-- 验证设置
SELECT schemaname, tablename, replicaidentity
FROM pg_tables
WHERE tablename = 'users';

业务场景分析

  • 适用场景:当主键不存在但有其他唯一标识时
  • 优势:查询效率高,适合业务逻辑中的自然键
  • 注意事项:索引必须是唯一的、非部分的 btree 或 hash 索引

3. FULL(全行标识)

示例场景:日志表没有主键的情况

sql
-- 创建日志表(无主键)
CREATE TABLE access_logs (
    user_ip INET,
    access_time TIMESTAMP,
    requested_url TEXT,
    response_code INTEGER
);

-- 设置为全行副本标识
ALTER TABLE access_logs REPLICA IDENTITY FULL;

-- 验证设置
SELECT schemaname, tablename, replicaidentity
FROM pg_tables
WHERE tablename = 'access_logs';

输出分析

 schemaname | tablename    | replicaidentity
------------+--------------+-----------------
 public     | access_logs  | f

> **FULL 模式的性能风险**

  • 每次 UPDATE/DELETE 都需要扫描整行数据
  • 在订阅者端可能导致全表扫描
  • 仅应作为无其他选择时的后备方案

创建和管理发布

基础发布创建

创建包含特定表的发布

sql
-- 创建包含订单表的发布
CREATE PUBLICATION orders_pub FOR TABLE orders;

-- 创建包含多个表的发布
CREATE PUBLICATION ecommerce_pub FOR TABLE orders, products, users;

-- 查看发布信息
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete, pubtruncate
FROM pg_publication;

输出示例

  pubname     | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
--------------+--------------+-----------+-----------+-----------+-------------
 orders_pub   | f            | t         | t         | t         | t
 ecommerce_pub| f            | t         | t         | t         | t

创建全表发布

sql
-- 创建发布所有表的发布
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;

-- 查看发布的表
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'all_tables_pub';

选择性操作发布

只复制插入操作

sql
-- 创建只复制 INSERT 的发布(适用于数据仓库场景)
CREATE PUBLICATION insert_only_pub
FOR TABLE sales_data
WITH (publish = 'insert');

业务场景

  • 数据仓库:只需要新增数据,不允许修改历史数据
  • 审计日志:只记录新事件,不允许修改历史记录

排除特定操作

sql
-- 创建排除删除操作的发布(适用于敏感数据)
CREATE PUBLICATION no_delete_pub
FOR TABLE customer_data
WITH (publish = 'insert, update');

动态管理发布表

添加和移除表

sql
-- 向现有发布添加表
ALTER PUBLICATION ecommerce_pub ADD TABLE inventory;

-- 从发布中移除表
ALTER PUBLICATION ecommerce_pub DROP TABLE products;

-- 查看发布包含的表
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'ecommerce_pub';

修改发布参数

sql
-- 修改发布的操作类型
ALTER PUBLICATION orders_pub SET (publish = 'insert, update');

-- 重新设置为所有操作
ALTER PUBLICATION orders_pub SET (publish = 'insert, update, delete, truncate');

实际业务场景应用

场景一:电商平台数据同步

需求描述

  • 主数据库:处理订单、用户、商品数据
  • 只读副本:为分析系统提供数据
  • 要求:实时同步订单和用户数据,但不同步敏感的支付信息

解决方案

sql
-- 1. 创建订单发布(排除敏感字段的表)
CREATE TABLE orders_public AS
SELECT order_id, user_id, product_id, quantity, order_date, status
FROM orders;

-- 2. 创建发布
CREATE PUBLICATION analytics_pub FOR TABLE orders_public, users, products;

-- 3. 设置适当的副本标识
ALTER TABLE orders_public REPLICA IDENTITY USING INDEX orders_public_pkey;
ALTER TABLE users REPLICA IDENTITY USING INDEX users_email_key;

分析过程

  1. 数据隔离:通过创建公共视图表排除敏感数据
  2. 性能优化:使用主键和唯一索引作为副本标识
  3. 操作控制:默认复制所有 DML 操作,保证数据一致性

场景二:微服务数据分发

需求描述

  • 用户服务需要订单创建事件
  • 库存服务需要订单状态变更事件
  • 不同服务需要不同的数据子集

解决方案

sql
-- 为用户服务创建发布(只需要新订单)
CREATE PUBLICATION user_service_pub
FOR TABLE orders
WITH (publish = 'insert');

-- 为库存服务创建发布(需要订单状态变更)
CREATE PUBLICATION inventory_service_pub
FOR TABLE orders
WITH (publish = 'insert, update');

-- 查看不同发布的配置
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete
FROM pg_publication
WHERE pubname LIKE '%service%';

输出分析

      pubname        | puballtables | pubinsert | pubupdate | pubdelete
---------------------+--------------+-----------+-----------+-----------
 user_service_pub    | f            | t         | f         | f
 inventory_service_pub| f            | t         | t         | f

场景三:数据迁移和升级

需求描述

  • 从旧系统迁移到新系统
  • 需要在迁移期间保持数据同步
  • 完成迁移后停止复制

解决方案

sql
-- 1. 创建迁移发布
CREATE PUBLICATION migration_pub FOR ALL TABLES;

-- 2. 监控复制状态
SELECT slot_name, plugin, slot_type, database, active
FROM pg_replication_slots;

-- 3. 迁移完成后清理
DROP PUBLICATION migration_pub;

监控和故障排除

发布状态监控

查看发布信息

sql
-- 查看所有发布
SELECT
    pubname,
    pubowner::regrole,
    puballtables,
    pubinsert,
    pubupdate,
    pubdelete,
    pubtruncate
FROM pg_publication;

-- 查看发布包含的表
SELECT
    p.pubname,
    t.schemaname,
    t.tablename,
    t.rowfilter
FROM pg_publication p
JOIN pg_publication_tables t ON p.pubname = t.pubname
ORDER BY p.pubname, t.schemaname, t.tablename;

复制槽状态监控

sql
-- 查看复制槽状态
SELECT
    slot_name,
    plugin,
    slot_type,
    database,
    active,
    active_pid,
    restart_lsn,
    confirmed_flush_lsn
FROM pg_replication_slots;

-- 查看复制延迟
SELECT
    client_addr,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    write_lag,
    flush_lag,
    replay_lag
FROM pg_stat_replication;

常见问题和解决方案

问题一:副本标识缺失错误

错误信息

ERROR: cannot update table "tablename" because it does not have a replica identity and publishes updates

解决方案

sql
-- 方案1:添加主键
ALTER TABLE tablename ADD PRIMARY KEY (id);

-- 方案2:设置唯一索引为副本标识
CREATE UNIQUE INDEX idx_tablename_unique ON tablename (column1, column2);
ALTER TABLE tablename REPLICA IDENTITY USING INDEX idx_tablename_unique;

-- 方案3:设置为 FULL(谨慎使用)
ALTER TABLE tablename REPLICA IDENTITY FULL;

问题二:发布表不匹配

场景:订阅者端表结构与发布者不完全匹配

解决方案

sql
-- 检查表结构差异
SELECT
    column_name,
    data_type,
    is_nullable
FROM information_schema.columns
WHERE table_name = 'target_table'
ORDER BY ordinal_position;

-- 调整表结构或修改发布范围
ALTER PUBLICATION pub_name DROP TABLE problematic_table;

性能优化建议

选择合适的副本标识

性能优化原则

  1. 优先使用主键:查询效率最高
  2. 选择合适的唯一索引:避免使用复合索引作为副本标识
  3. 避免 FULL 模式:仅在无其他选择时使用
  4. 考虑订阅者端索引:确保订阅者端有相应的索引支持快速查找 :::

发布粒度控制

sql
-- 大表分批发布示例
CREATE PUBLICATION orders_recent_pub
FOR TABLE orders
WHERE (order_date >= CURRENT_DATE - INTERVAL '30 days');

-- 按业务逻辑分组发布
CREATE PUBLICATION critical_data_pub FOR TABLE orders, payments;
CREATE PUBLICATION analytics_data_pub FOR TABLE user_actions, page_views;

监控复制性能

sql
-- 创建性能监控视图
CREATE VIEW replication_performance AS
SELECT
    slot_name,
    database,
    active,
    restart_lsn,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size,
    CASE
        WHEN active THEN 'Connected'
        ELSE 'Disconnected'
    END as status
FROM pg_replication_slots
WHERE slot_type = 'logical';

-- 查看性能状态
SELECT * FROM replication_performance;

最佳实践总结

设计原则

  1. 明确复制需求:确定哪些表和操作需要复制
  2. 合理设置副本标识:平衡查询性能和存储成本
  3. 按业务分组:创建多个小发布而不是一个大发布
  4. 定期监控:建立监控机制跟踪复制状态

安全考虑

> **安全注意事项**

  • 敏感数据不应包含在发布中
  • 使用专门的复制用户,限制其权限
  • 确保网络连接的安全性
  • 定期审核发布内容和订阅者权限

通过本文的详细介绍,您应该能够理解 PostgreSQL 逻辑复制发布的核心概念,并能够在实际业务场景中正确配置和管理发布。记住,发布是逻辑复制的起点,合理的发布设计是整个复制系统成功的关键。