Appearance
PostgreSQL 逻辑复制行过滤器
概述
逻辑复制行过滤器是 PostgreSQL 中一项强大的功能,允许在发布-订阅复制架构中选择性地复制表数据。通过定义 WHERE 条件,可以实现部分数据复制,满足业务安全、性能和数据隔离需求。
INFO
行过滤器是在发布级别定义的,只有满足过滤条件的行才会被复制到订阅者节点。
核心概念
行过滤器工作原理
业务应用场景
1. 数据安全隔离
在多租户系统中,可以基于租户 ID 进行数据隔离:
sql
-- 只复制特定租户的数据
CREATE PUBLICATION tenant_a_pub
FOR TABLE user_orders WHERE (tenant_id = 'tenant_a');
2. 地理区域分布
电商系统按地区分布数据:
sql
-- 只复制北方地区的订单
CREATE PUBLICATION north_region_pub
FOR TABLE orders WHERE (region IN ('beijing', 'shanghai', 'tianjin'));
3. 数据生命周期管理
只复制活跃或最新的数据:
sql
-- 只复制最近30天的日志
CREATE PUBLICATION recent_logs_pub
FOR TABLE application_logs WHERE (created_at >= NOW() - INTERVAL '30 days');
行过滤器规则详解
基本规则
TIP
核心规则
- 行过滤器在发布更改之前应用
- 表达式评估为
false
或NULL
时,该行不会被复制 - 使用复制连接的角色权限进行表达式评估
- 对
TRUNCATE
命令无效
表达式限制
行过滤器的 WHERE 子句有严格的限制:
允许的表达式 | 禁止的表达式 |
---|---|
简单比较运算符 (= , > , < , >= , <= ) | 用户定义函数 |
逻辑运算符 (AND , OR , NOT ) | 自定义运算符和类型 |
内置不可变函数 | 系统列引用 |
常量值和列引用 | 可变内置函数(如 NOW() ) |
副本标识约束
如果发布包含 UPDATE 或 DELETE 操作,行过滤器必须只包含副本标识(REPLICA IDENTITY)涵盖的列。
sql
-- 设置副本标识
ALTER TABLE orders REPLICA IDENTITY USING INDEX orders_pkey;
-- 正确:使用主键列进行过滤
CREATE PUBLICATION orders_pub
FOR TABLE orders WHERE (order_id > 1000);
-- 错误:使用非副本标识列
CREATE PUBLICATION orders_pub
FOR TABLE orders WHERE (description LIKE '%urgent%'); -- 如果description不在副本标识中
UPDATE 转换机制
转换规则矩阵
UPDATE 操作的处理取决于旧行和新行是否满足行过滤器条件:
旧行状态 | 新行状态 | 转换结果 | 说明 |
---|---|---|---|
不匹配 | 不匹配 | 不复制 | 两个状态都不满足条件 |
不匹配 | 匹配 | INSERT | 新行需要添加到订阅者 |
匹配 | 不匹配 | DELETE | 旧行需要从订阅者删除 |
匹配 | 匹配 | UPDATE | 正常的 UPDATE 操作 |
实际示例演示
sql
-- 创建测试表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
category VARCHAR(50),
price DECIMAL(10,2),
status VARCHAR(20)
);
-- 创建带过滤器的发布
CREATE PUBLICATION active_products_pub
FOR TABLE products WHERE (status = 'active' AND price > 100);
-- 插入测试数据
INSERT INTO products VALUES
(1, 'electronics', 150.00, 'active'),
(2, 'books', 50.00, 'active'),
(3, 'electronics', 200.00, 'inactive');
sql
-- 创建相同结构的表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
category VARCHAR(50),
price DECIMAL(10,2),
status VARCHAR(20)
);
-- 创建订阅
CREATE SUBSCRIPTION products_sub
CONNECTION 'host=publisher dbname=ecommerce'
PUBLICATION active_products_pub;
UPDATE 转换示例分析
sql
-- 初始状态:只有id=1的记录满足条件(status='active' AND price > 100)
-- 订阅者表中只有:(1, 'electronics', 150.00, 'active')
-- 场景1:不匹配 → 匹配 = INSERT
UPDATE products SET price = 120.00 WHERE id = 2;
-- 结果:在订阅者中INSERT新记录 (2, 'books', 120.00, 'active')
-- 场景2:匹配 → 不匹配 = DELETE
UPDATE products SET status = 'inactive' WHERE id = 1;
-- 结果:在订阅者中DELETE记录 id=1
-- 场景3:匹配 → 匹配 = UPDATE
UPDATE products SET price = 180.00 WHERE id = 1 AND status = 'active';
-- 结果:在订阅者中UPDATE记录价格为180.00
-- 场景4:不匹配 → 不匹配 = 不复制
UPDATE products SET category = 'furniture' WHERE id = 3;
-- 结果:无操作,因为记录不满足过滤条件
分区表处理
publish_via_partition_root 参数影响
分区表示例
sql
-- 创建分区表
CREATE TABLE sales_data (
sale_id SERIAL,
sale_date DATE,
amount DECIMAL(10,2),
region VARCHAR(20),
PRIMARY KEY (sale_id, sale_date)
) PARTITION BY RANGE (sale_date);
-- 创建分区
CREATE TABLE sales_2023 PARTITION OF sales_data
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE sales_2024 PARTITION OF sales_data
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
sql
-- 策略1:使用根表过滤器
CREATE PUBLICATION sales_root_pub
FOR TABLE sales_data WHERE (amount > 1000)
WITH (publish_via_partition_root = true);
-- 策略2:使用分区独立过滤器
CREATE PUBLICATION sales_partition_pub
FOR TABLE
sales_2023 WHERE (amount > 500),
sales_2024 WHERE (amount > 2000)
WITH (publish_via_partition_root = false);
分区过滤器对比
配置 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
publish_via_partition_root = true | 统一管理,简化配置 | 无法针对分区定制 | 过滤条件统一的场景 |
publish_via_partition_root = false | 灵活的分区级控制 | 配置复杂,维护成本高 | 不同分区需要不同过滤条件 |
初始数据同步
同步机制
初始数据同步不考虑 `publish` 参数,可能会复制一些在正常 DML 操作中不会复制的行。
多发布合并示例
sql
-- 发布者端:创建多个发布
CREATE PUBLICATION pub_high_value
FOR TABLE orders WHERE (total_amount > 1000);
CREATE PUBLICATION pub_vip_customers
FOR TABLE orders WHERE (customer_type = 'VIP');
CREATE PUBLICATION pub_recent
FOR TABLE orders WHERE (order_date >= '2024-01-01');
INFO
合并逻辑如果订阅者订阅多个发布,行过滤器会进行 OR 运算。满足任一条件的行都会被复制。
sql
-- 订阅者端:订阅多个发布
CREATE SUBSCRIPTION multi_pub_sub
CONNECTION 'host=publisher dbname=ecommerce'
PUBLICATION pub_high_value, pub_vip_customers, pub_recent;
-- 实际复制条件:
-- (total_amount > 1000) OR (customer_type = 'VIP') OR (order_date >= '2024-01-01')
实际应用案例
案例 1:电商数据分发系统
业务需求:将订单数据按地区分发到不同的数据中心
sql
-- 发布者:主数据中心
CREATE TABLE orders (
order_id BIGSERIAL PRIMARY KEY,
user_id BIGINT,
region VARCHAR(20),
order_amount DECIMAL(12,2),
order_status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
-- 为不同地区创建发布
CREATE PUBLICATION asia_orders_pub
FOR TABLE orders WHERE (region IN ('china', 'japan', 'korea'));
CREATE PUBLICATION europe_orders_pub
FOR TABLE orders WHERE (region IN ('germany', 'france', 'uk'));
CREATE PUBLICATION americas_orders_pub
FOR TABLE orders WHERE (region IN ('usa', 'canada', 'mexico'));
订阅者配置:
sql
CREATE SUBSCRIPTION asia_orders_sub
CONNECTION 'host=main-dc dbname=ecommerce user=repl_user'
PUBLICATION asia_orders_pub;
sql
CREATE SUBSCRIPTION europe_orders_sub
CONNECTION 'host=main-dc dbname=ecommerce user=repl_user'
PUBLICATION europe_orders_pub;
案例 2:数据生命周期管理
业务需求:将活跃用户数据复制到分析数据库,历史数据保留在主库
sql
-- 主库:用户活动表
CREATE TABLE user_activities (
activity_id BIGSERIAL PRIMARY KEY,
user_id BIGINT,
activity_type VARCHAR(50),
activity_data JSONB,
last_active_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
-- 只复制最近活跃的用户数据
CREATE PUBLICATION active_users_pub
FOR TABLE user_activities
WHERE (last_active_at >= NOW() - INTERVAL '30 days');
分析库配置:
sql
-- 分析数据库
CREATE SUBSCRIPTION analytics_sub
CONNECTION 'host=production-db dbname=mainapp user=analytics_user'
PUBLICATION active_users_pub;
-- 定期清理旧数据的策略
CREATE OR REPLACE FUNCTION cleanup_old_activities()
RETURNS void AS $$
BEGIN
DELETE FROM user_activities
WHERE last_active_at < NOW() - INTERVAL '90 days';
END;
$$ LANGUAGE plpgsql;
-- 设置定时任务
SELECT cron.schedule('cleanup-old-activities', '0 2 * * *', 'SELECT cleanup_old_activities();');
性能优化建议
索引策略
为行过滤器涉及的列创建合适的索引,提升过滤性能。
sql
-- 为过滤条件创建索引
CREATE INDEX idx_orders_region_status
ON orders (region, order_status)
WHERE order_status IN ('active', 'processing');
-- 为时间范围过滤创建索引
CREATE INDEX idx_activities_active_time
ON user_activities (last_active_at)
WHERE last_active_at >= '2024-01-01';
监控和诊断
sql
-- 监控复制状态
SELECT
subscription_name,
pid,
received_lsn,
last_msg_send_time,
last_msg_receipt_time,
latest_end_lsn,
latest_end_time
FROM pg_stat_subscription;
-- 检查复制延迟
SELECT
client_addr,
application_name,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
write_lag,
flush_lag,
replay_lag
FROM pg_stat_replication;
性能测试脚本
sql
-- 生成测试数据
INSERT INTO orders (user_id, region, order_amount, order_status)
SELECT
generate_series(1, 100000) % 10000 + 1,
(ARRAY['china', 'usa', 'germany', 'japan'])[ceil(random() * 4)],
random() * 1000 + 10,
(ARRAY['active', 'completed', 'cancelled'])[ceil(random() * 3)];
sql
-- 测试过滤器性能
EXPLAIN (ANALYZE, BUFFERS)
SELECT COUNT(*)
FROM orders
WHERE region = 'china' AND order_status = 'active';
-- 测试复制性能
SELECT
schemaname,
tablename,
n_tup_ins,
n_tup_upd,
n_tup_del
FROM pg_stat_user_tables
WHERE tablename = 'orders';
故障排查指南
常见问题及解决方案
1. 行过滤器表达式错误
问题:创建发布时出现表达式错误
bash
ERROR: row filter expression contains mutable functions
解决方案:
sql
-- 错误:使用了可变函数
CREATE PUBLICATION recent_orders_pub
FOR TABLE orders WHERE (created_at >= NOW() - INTERVAL '1 day');
sql
-- 正确:使用固定时间点
CREATE PUBLICATION recent_orders_pub
FOR TABLE orders WHERE (created_at >= '2024-01-01');
-- 或者使用应用层动态更新
CREATE OR REPLACE FUNCTION update_publication_filter()
RETURNS void AS $$
DECLARE
cutoff_date DATE := CURRENT_DATE - INTERVAL '30 days';
BEGIN
EXECUTE format('ALTER PUBLICATION recent_orders_pub SET TABLE orders WHERE (created_at >= %L)', cutoff_date);
END;
$$ LANGUAGE plpgsql;
2. 副本标识冲突
问题:UPDATE/DELETE 操作中使用了非副本标识列
bash
ERROR: row filter for relation "orders" contains non-replica identity columns
解决方案:
sql
-- 检查当前副本标识
SELECT
schemaname,
tablename,
attname,
format_type(atttypid, atttypmod) as data_type
FROM pg_attribute a
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relreplident != 'd'
AND n.nspname = 'public'
AND c.relname = 'orders';
-- 修改副本标识以包含需要的列
CREATE UNIQUE INDEX orders_replica_idx ON orders (order_id, region);
ALTER TABLE orders REPLICA IDENTITY USING INDEX orders_replica_idx;
3. 初始同步数据不一致
问题:初始同步的数据与预期不符
Details
诊断步骤
- 检查发布定义
- 验证过滤条件
- 确认订阅状态
- 手动验证数据一致性
sql
-- 检查发布配置
SELECT
pubname,
pubowner,
puballtables,
pubinsert,
pubupdate,
pubdelete,
pubtruncate,
pubviaroot
FROM pg_publication
WHERE pubname = 'your_publication_name';
-- 检查表级过滤器
SELECT
schemaname,
tablename,
pubname,
rowfilter
FROM pg_publication_tables
WHERE pubname = 'your_publication_name';
-- 手动验证数据一致性
SELECT
'publisher' as source,
COUNT(*) as row_count,
SUM(order_amount) as total_amount
FROM orders
WHERE region = 'china' AND order_status = 'active'
UNION ALL
SELECT
'subscriber' as source,
COUNT(*) as row_count,
SUM(order_amount) as total_amount
FROM orders
WHERE region = 'china' AND order_status = 'active';
最佳实践总结
设计原则
- 明确过滤需求:在设计阶段就确定数据分发策略
- 简化过滤条件:避免复杂的表达式,优先使用索引友好的条件
- 考虑数据一致性:确保过滤逻辑不会导致数据关系破坏
- 规划副本标识:为涉及 UPDATE/DELETE 的表合理设置副本标识
运维建议
- 监控复制延迟:定期检查复制状态和延迟情况
- 测试故障恢复:定期演练订阅重建和数据重同步
- 版本兼容性:注意 PostgreSQL 版本间的行过滤器兼容性
- 文档维护:及时更新发布订阅的配置文档
WARNING
注意事项
- PostgreSQL 15 以下版本在初始数据同步时不支持行过滤器
- 行过滤器不影响 TRUNCATE 操作
- 多个发布的过滤器会进行 OR 运算合并
- 表达式中不能使用用户定义的函数或可变函数
通过合理使用 PostgreSQL 的逻辑复制行过滤器,可以构建灵活、高效的数据分发系统,满足现代应用对数据隔离、安全性和性能的需求。