Appearance
PostgreSQL 逻辑复制快速设置指南
什么是逻辑复制?
逻辑复制是 PostgreSQL 10+ 版本引入的一种数据复制机制,它允许在数据库实例之间复制特定的表数据。与物理复制不同,逻辑复制基于数据的逻辑变化(如 INSERT、UPDATE、DELETE 操作),而不是复制底层的物理数据块。
逻辑复制 vs 物理复制对比
特性 | 逻辑复制 | 物理复制 |
---|---|---|
复制粒度 | 表级别 | 整个数据库集群 |
PostgreSQL 版本要求 | 可以跨版本 | 必须相同版本 |
复制内容 | 只复制数据变化 | 复制所有物理文件 |
性能开销 | 相对较高 | 相对较低 |
灵活性 | 高(可选择表) | 低(全部复制) |
适用场景 | 数据分发、版本升级 | 高可用、灾备 |
逻辑复制工作原理
快速设置步骤详解
步骤 1:配置发布者数据库
1.1 修改 postgresql.conf
bash
# 编辑 postgresql.conf 文件
sudo vim /etc/postgresql/14/main/postgresql.conf
# 设置 WAL 级别为逻辑模式
wal_level = logical
# 其他推荐设置(通常默认值已足够)
max_replication_slots = 10 # 最大复制槽数量
max_wal_senders = 10 # 最大 WAL 发送进程数
IMPORTANT
修改 wal_level
后需要重启 PostgreSQL 服务才能生效
1.2 配置 pg_hba.conf
bash
# 编辑 pg_hba.conf 文件
sudo vim /etc/postgresql/14/main/pg_hba.conf
# 添加复制用户的访问规则
# TYPE DATABASE USER ADDRESS METHOD
host all repuser 0.0.0.0/0 md5
host replication repuser 0.0.0.0/0 md5
WARNING
生产环境中,请根据实际网络环境配置更严格的 IP 地址范围,避免使用 0.0.0.0/0
1.3 重启 PostgreSQL 服务
bash
# 重启服务使配置生效
sudo systemctl restart postgresql
步骤 2:创建复制用户和准备数据
sql
-- 在发布者数据库上创建复制用户
CREATE USER repuser WITH REPLICATION LOGIN PASSWORD 'your_secure_password';
-- 授予必要的权限
GRANT CONNECT ON DATABASE your_database TO repuser;
GRANT USAGE ON SCHEMA public TO repuser;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repuser;
-- 创建示例表(如果还没有)
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE departments (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
manager_id INTEGER REFERENCES users(id)
);
-- 插入一些测试数据
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]'),
('charlie', '[email protected]');
INSERT INTO departments (name, manager_id) VALUES
('Engineering', 1),
('Sales', 2),
('HR', 3);
步骤 3:创建发布(Publication)
sql
-- 在发布者数据库上执行
-- 创建发布,包含指定的表
CREATE PUBLICATION mypub FOR TABLE users, departments;
-- 查看发布信息
SELECT * FROM pg_publication;
-- 查看发布包含的表
SELECT * FROM pg_publication_tables WHERE pubname = 'mypub';
TIP
也可以创建包含所有表的发布:
sql
CREATE PUBLICATION mypub_all FOR ALL TABLES;
步骤 4:设置订阅者数据库
4.1 准备订阅者数据库结构
sql
-- 在订阅者数据库上执行
-- 创建相同结构的表(不包含数据)
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE departments (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
manager_id INTEGER REFERENCES users(id)
);
NOTE
订阅者的表结构必须与发布者兼容,但可以有额外的列或不同的约束
4.2 创建订阅(Subscription)
sql
-- 在订阅者数据库上执行
-- 创建订阅并连接到发布者
CREATE SUBSCRIPTION mysub
CONNECTION 'host=publisher_host port=5432 dbname=source_db user=repuser password=your_secure_password'
PUBLICATION mypub;
-- 查看订阅状态
SELECT * FROM pg_subscription;
-- 查看订阅的详细信息
SELECT
subname AS subscription_name,
pg_stat_get_subscription(sub.oid).latest_end_lsn AS latest_lsn,
pg_stat_get_subscription(sub.oid).latest_end_time AS latest_sync_time
FROM pg_subscription sub;
实际业务场景示例
场景 1:多区域数据分发
假设一个电商公司需要将商品数据从中心数据库分发到各个区域的数据库:
实现代码:
sql
-- 在中心数据库创建商品表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
sku VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(200) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
stock INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建发布
CREATE PUBLICATION product_pub FOR TABLE products;
-- 在各区域数据库创建订阅
-- 华北区域
CREATE SUBSCRIPTION north_sub
CONNECTION 'host=center.example.com dbname=center_db user=repuser password=pass'
PUBLICATION product_pub;
-- 华东区域
CREATE SUBSCRIPTION east_sub
CONNECTION 'host=center.example.com dbname=center_db user=repuser password=pass'
PUBLICATION product_pub;
场景 2:实时数据同步监控
创建监控逻辑复制状态的查询:
sql
-- 监控发布者端的复制状态
CREATE OR REPLACE VIEW v_replication_status AS
SELECT
slot_name,
plugin,
slot_type,
database,
active,
active_pid,
xmin,
catalog_xmin,
restart_lsn,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS replication_lag_size
FROM pg_replication_slots
WHERE slot_type = 'logical';
-- 使用视图查看状态
SELECT * FROM v_replication_status;
-- 监控订阅者端的同步状态
SELECT
subname,
pid,
relid::regclass AS table_name,
received_lsn,
latest_end_lsn,
latest_end_time,
CASE
WHEN latest_end_time IS NOT NULL THEN
EXTRACT(EPOCH FROM (NOW() - latest_end_time))::INT
ELSE NULL
END AS lag_seconds
FROM pg_stat_subscription
JOIN pg_subscription ON pg_subscription.oid = suboid;
高级配置选项
1. 选择性列复制
sql
-- 只复制表的特定列
CREATE PUBLICATION selective_pub FOR TABLE users (id, username, email);
-- 订阅者可以有额外的列
ALTER TABLE users ADD COLUMN last_login TIMESTAMP;
2. 使用 WHERE 子句过滤数据
sql
-- 只复制活跃用户(PostgreSQL 15+)
CREATE PUBLICATION active_users_pub
FOR TABLE users
WHERE (is_active = true);
3. 处理冲突
sql
-- 设置冲突解决策略
ALTER SUBSCRIPTION mysub SET (conflict_resolution = 'apply_remote');
-- 跳过特定的错误事务
ALTER SUBSCRIPTION mysub SKIP (lsn = '0/1234567');
故障排查指南
常见问题和解决方案
1. 订阅创建失败
错误信息: ERROR: could not connect to the publisher
解决方案:
bash
# 检查网络连接
telnet publisher_host 5432
# 验证连接字符串
psql "host=publisher_host dbname=source_db user=repuser"
# 检查 pg_hba.conf 配置
grep repuser /etc/postgresql/14/main/pg_hba.conf
2. 初始数据同步卡住
症状: 表数据一直没有同步过来
诊断步骤:
sql
-- 检查订阅状态
SELECT * FROM pg_subscription_rel;
-- 查看后台工作进程
SELECT * FROM pg_stat_activity WHERE application_name LIKE '%logical replication%';
-- 检查错误日志
SELECT * FROM pg_stat_subscription WHERE subname = 'mysub';
3. 复制延迟过大
监控查询:
sql
-- 创建延迟监控函数
CREATE OR REPLACE FUNCTION check_replication_lag()
RETURNS TABLE(
slot_name TEXT,
lag_bytes BIGINT,
lag_pretty TEXT
) AS $$
BEGIN
RETURN QUERY
SELECT
s.slot_name::TEXT,
pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn)::BIGINT,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn))::TEXT
FROM pg_replication_slots s
WHERE s.slot_type = 'logical' AND s.active;
END;
$$ LANGUAGE plpgsql;
-- 使用函数
SELECT * FROM check_replication_lag();
性能优化建议
1. 批量操作优化
sql
-- 发布者端:使用批量插入减少复制开销
BEGIN;
-- 临时禁用同步提交提高性能
SET LOCAL synchronous_commit = OFF;
-- 批量插入数据
INSERT INTO users (username, email)
SELECT
'user_' || generate_series(1, 10000),
'user_' || generate_series(1, 10000) || '@example.com';
COMMIT;
2. 并行应用优化
sql
-- PostgreSQL 16+ 支持并行应用
ALTER SUBSCRIPTION mysub SET (streaming = 'parallel');
ALTER SUBSCRIPTION mysub SET (parallel_apply_workers = 4);
3. 监控和告警设置
sql
-- 创建复制延迟告警函数
CREATE OR REPLACE FUNCTION alert_replication_lag(threshold_bytes BIGINT DEFAULT 100000000)
RETURNS TABLE(
alert_message TEXT,
slot_name TEXT,
lag_bytes BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
'WARNING: Replication lag exceeds threshold'::TEXT,
s.slot_name::TEXT,
pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn)::BIGINT
FROM pg_replication_slots s
WHERE s.slot_type = 'logical'
AND s.active
AND pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn) > threshold_bytes;
END;
$$ LANGUAGE plpgsql;
-- 定期检查
SELECT * FROM alert_replication_lag(50 * 1024 * 1024); -- 50MB 阈值
安全最佳实践
1. 使用 SSL 加密连接
sql
-- 创建带 SSL 的订阅
CREATE SUBSCRIPTION secure_sub
CONNECTION 'host=publisher.example.com dbname=source_db user=repuser password=pass sslmode=require'
PUBLICATION mypub;
2. 最小权限原则
sql
-- 为复制用户创建专用角色
CREATE ROLE replication_role WITH LOGIN REPLICATION;
-- 只授予必要的表权限
GRANT SELECT ON users, departments TO replication_role;
-- 撤销不必要的权限
REVOKE ALL ON SCHEMA public FROM replication_role;
GRANT USAGE ON SCHEMA public TO replication_role;
3. 审计日志配置
sql
-- 启用复制相关的日志
ALTER SYSTEM SET log_replication_commands = ON;
ALTER SYSTEM SET log_statement = 'all';
SELECT pg_reload_conf();
CAUTION
在生产环境中,请务必:
- 使用强密码和 SSL 连接
- 限制复制用户的 IP 访问范围
- 定期监控复制状态和延迟
- 制定故障切换计划