Skip to content

PostgreSQL 逻辑复制快速设置指南

什么是逻辑复制?

逻辑复制是 PostgreSQL 10+ 版本引入的一种数据复制机制,它允许在数据库实例之间复制特定的表数据。与物理复制不同,逻辑复制基于数据的逻辑变化(如 INSERT、UPDATE、DELETE 操作),而不是复制底层的物理数据块。

逻辑复制 vs 物理复制对比

特性逻辑复制物理复制
复制粒度表级别整个数据库集群
PostgreSQL 版本要求可以跨版本必须相同版本
复制内容只复制数据变化复制所有物理文件
性能开销相对较高相对较低
灵活性高(可选择表)低(全部复制)
适用场景数据分发、版本升级高可用、灾备

逻辑复制工作原理

快速设置步骤详解

步骤 1:配置发布者数据库

1.1 修改 postgresql.conf

bash
# 编辑 postgresql.conf 文件
sudo vim /etc/postgresql/14/main/postgresql.conf

# 设置 WAL 级别为逻辑模式
wal_level = logical

# 其他推荐设置(通常默认值已足够)
max_replication_slots = 10     # 最大复制槽数量
max_wal_senders = 10          # 最大 WAL 发送进程数

IMPORTANT

修改 wal_level 后需要重启 PostgreSQL 服务才能生效

1.2 配置 pg_hba.conf

bash
# 编辑 pg_hba.conf 文件
sudo vim /etc/postgresql/14/main/pg_hba.conf

# 添加复制用户的访问规则
# TYPE  DATABASE    USER        ADDRESS           METHOD
host    all         repuser     0.0.0.0/0         md5
host    replication repuser     0.0.0.0/0         md5

WARNING

生产环境中,请根据实际网络环境配置更严格的 IP 地址范围,避免使用 0.0.0.0/0

1.3 重启 PostgreSQL 服务

bash
# 重启服务使配置生效
sudo systemctl restart postgresql

步骤 2:创建复制用户和准备数据

sql
-- 在发布者数据库上创建复制用户
CREATE USER repuser WITH REPLICATION LOGIN PASSWORD 'your_secure_password';

-- 授予必要的权限
GRANT CONNECT ON DATABASE your_database TO repuser;
GRANT USAGE ON SCHEMA public TO repuser;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repuser;

-- 创建示例表(如果还没有)
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE departments (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    manager_id INTEGER REFERENCES users(id)
);

-- 插入一些测试数据
INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]'),
    ('charlie', '[email protected]');

INSERT INTO departments (name, manager_id) VALUES 
    ('Engineering', 1),
    ('Sales', 2),
    ('HR', 3);

步骤 3:创建发布(Publication)

sql
-- 在发布者数据库上执行
-- 创建发布,包含指定的表
CREATE PUBLICATION mypub FOR TABLE users, departments;

-- 查看发布信息
SELECT * FROM pg_publication;

-- 查看发布包含的表
SELECT * FROM pg_publication_tables WHERE pubname = 'mypub';

TIP

也可以创建包含所有表的发布:

sql
CREATE PUBLICATION mypub_all FOR ALL TABLES;

步骤 4:设置订阅者数据库

4.1 准备订阅者数据库结构

sql
-- 在订阅者数据库上执行
-- 创建相同结构的表(不包含数据)
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE departments (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    manager_id INTEGER REFERENCES users(id)
);

NOTE

订阅者的表结构必须与发布者兼容,但可以有额外的列或不同的约束

4.2 创建订阅(Subscription)

sql
-- 在订阅者数据库上执行
-- 创建订阅并连接到发布者
CREATE SUBSCRIPTION mysub 
CONNECTION 'host=publisher_host port=5432 dbname=source_db user=repuser password=your_secure_password' 
PUBLICATION mypub;

-- 查看订阅状态
SELECT * FROM pg_subscription;

-- 查看订阅的详细信息
SELECT 
    subname AS subscription_name,
    pg_stat_get_subscription(sub.oid).latest_end_lsn AS latest_lsn,
    pg_stat_get_subscription(sub.oid).latest_end_time AS latest_sync_time
FROM pg_subscription sub;

实际业务场景示例

场景 1:多区域数据分发

假设一个电商公司需要将商品数据从中心数据库分发到各个区域的数据库:

实现代码:

sql
-- 在中心数据库创建商品表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    sku VARCHAR(50) UNIQUE NOT NULL,
    name VARCHAR(200) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    stock INTEGER DEFAULT 0,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建发布
CREATE PUBLICATION product_pub FOR TABLE products;

-- 在各区域数据库创建订阅
-- 华北区域
CREATE SUBSCRIPTION north_sub 
CONNECTION 'host=center.example.com dbname=center_db user=repuser password=pass' 
PUBLICATION product_pub;

-- 华东区域
CREATE SUBSCRIPTION east_sub 
CONNECTION 'host=center.example.com dbname=center_db user=repuser password=pass' 
PUBLICATION product_pub;

场景 2:实时数据同步监控

创建监控逻辑复制状态的查询:

sql
-- 监控发布者端的复制状态
CREATE OR REPLACE VIEW v_replication_status AS
SELECT 
    slot_name,
    plugin,
    slot_type,
    database,
    active,
    active_pid,
    xmin,
    catalog_xmin,
    restart_lsn,
    confirmed_flush_lsn,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS replication_lag_size
FROM pg_replication_slots
WHERE slot_type = 'logical';

-- 使用视图查看状态
SELECT * FROM v_replication_status;

-- 监控订阅者端的同步状态
SELECT 
    subname,
    pid,
    relid::regclass AS table_name,
    received_lsn,
    latest_end_lsn,
    latest_end_time,
    CASE 
        WHEN latest_end_time IS NOT NULL THEN 
            EXTRACT(EPOCH FROM (NOW() - latest_end_time))::INT 
        ELSE NULL 
    END AS lag_seconds
FROM pg_stat_subscription
JOIN pg_subscription ON pg_subscription.oid = suboid;

高级配置选项

1. 选择性列复制

sql
-- 只复制表的特定列
CREATE PUBLICATION selective_pub FOR TABLE users (id, username, email);

-- 订阅者可以有额外的列
ALTER TABLE users ADD COLUMN last_login TIMESTAMP;

2. 使用 WHERE 子句过滤数据

sql
-- 只复制活跃用户(PostgreSQL 15+)
CREATE PUBLICATION active_users_pub 
FOR TABLE users 
WHERE (is_active = true);

3. 处理冲突

sql
-- 设置冲突解决策略
ALTER SUBSCRIPTION mysub SET (conflict_resolution = 'apply_remote');

-- 跳过特定的错误事务
ALTER SUBSCRIPTION mysub SKIP (lsn = '0/1234567');

故障排查指南

常见问题和解决方案

1. 订阅创建失败

错误信息: ERROR: could not connect to the publisher

解决方案:

bash
# 检查网络连接
telnet publisher_host 5432

# 验证连接字符串
psql "host=publisher_host dbname=source_db user=repuser"

# 检查 pg_hba.conf 配置
grep repuser /etc/postgresql/14/main/pg_hba.conf
2. 初始数据同步卡住

症状: 表数据一直没有同步过来

诊断步骤:

sql
-- 检查订阅状态
SELECT * FROM pg_subscription_rel;

-- 查看后台工作进程
SELECT * FROM pg_stat_activity WHERE application_name LIKE '%logical replication%';

-- 检查错误日志
SELECT * FROM pg_stat_subscription WHERE subname = 'mysub';
3. 复制延迟过大

监控查询:

sql
-- 创建延迟监控函数
CREATE OR REPLACE FUNCTION check_replication_lag()
RETURNS TABLE(
    slot_name TEXT,
    lag_bytes BIGINT,
    lag_pretty TEXT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        s.slot_name::TEXT,
        pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn)::BIGINT,
        pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn))::TEXT
    FROM pg_replication_slots s
    WHERE s.slot_type = 'logical' AND s.active;
END;
$$ LANGUAGE plpgsql;

-- 使用函数
SELECT * FROM check_replication_lag();

性能优化建议

1. 批量操作优化

sql
-- 发布者端:使用批量插入减少复制开销
BEGIN;
-- 临时禁用同步提交提高性能
SET LOCAL synchronous_commit = OFF;

-- 批量插入数据
INSERT INTO users (username, email) 
SELECT 
    'user_' || generate_series(1, 10000),
    'user_' || generate_series(1, 10000) || '@example.com';

COMMIT;

2. 并行应用优化

sql
-- PostgreSQL 16+ 支持并行应用
ALTER SUBSCRIPTION mysub SET (streaming = 'parallel');
ALTER SUBSCRIPTION mysub SET (parallel_apply_workers = 4);

3. 监控和告警设置

sql
-- 创建复制延迟告警函数
CREATE OR REPLACE FUNCTION alert_replication_lag(threshold_bytes BIGINT DEFAULT 100000000)
RETURNS TABLE(
    alert_message TEXT,
    slot_name TEXT,
    lag_bytes BIGINT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        'WARNING: Replication lag exceeds threshold'::TEXT,
        s.slot_name::TEXT,
        pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn)::BIGINT
    FROM pg_replication_slots s
    WHERE s.slot_type = 'logical' 
        AND s.active
        AND pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn) > threshold_bytes;
END;
$$ LANGUAGE plpgsql;

-- 定期检查
SELECT * FROM alert_replication_lag(50 * 1024 * 1024); -- 50MB 阈值

安全最佳实践

1. 使用 SSL 加密连接

sql
-- 创建带 SSL 的订阅
CREATE SUBSCRIPTION secure_sub 
CONNECTION 'host=publisher.example.com dbname=source_db user=repuser password=pass sslmode=require' 
PUBLICATION mypub;

2. 最小权限原则

sql
-- 为复制用户创建专用角色
CREATE ROLE replication_role WITH LOGIN REPLICATION;

-- 只授予必要的表权限
GRANT SELECT ON users, departments TO replication_role;

-- 撤销不必要的权限
REVOKE ALL ON SCHEMA public FROM replication_role;
GRANT USAGE ON SCHEMA public TO replication_role;

3. 审计日志配置

sql
-- 启用复制相关的日志
ALTER SYSTEM SET log_replication_commands = ON;
ALTER SYSTEM SET log_statement = 'all';
SELECT pg_reload_conf();

CAUTION

在生产环境中,请务必:

  • 使用强密码和 SSL 连接
  • 限制复制用户的 IP 访问范围
  • 定期监控复制状态和延迟
  • 制定故障切换计划