Skip to content

PostgreSQL 逻辑复制行过滤器

概述

逻辑复制行过滤器是 PostgreSQL 中一项强大的功能,允许在发布-订阅复制架构中选择性地复制表数据。通过定义 WHERE 条件,可以实现部分数据复制,满足业务安全、性能和数据隔离需求。

INFO

行过滤器是在发布级别定义的,只有满足过滤条件的行才会被复制到订阅者节点。

核心概念

行过滤器工作原理

业务应用场景

1. 数据安全隔离

在多租户系统中,可以基于租户 ID 进行数据隔离:

sql
-- 只复制特定租户的数据
CREATE PUBLICATION tenant_a_pub
FOR TABLE user_orders WHERE (tenant_id = 'tenant_a');

2. 地理区域分布

电商系统按地区分布数据:

sql
-- 只复制北方地区的订单
CREATE PUBLICATION north_region_pub
FOR TABLE orders WHERE (region IN ('beijing', 'shanghai', 'tianjin'));

3. 数据生命周期管理

只复制活跃或最新的数据:

sql
-- 只复制最近30天的日志
CREATE PUBLICATION recent_logs_pub
FOR TABLE application_logs WHERE (created_at >= NOW() - INTERVAL '30 days');

行过滤器规则详解

基本规则

TIP

核心规则

  • 行过滤器在发布更改之前应用
  • 表达式评估为 falseNULL 时,该行不会被复制
  • 使用复制连接的角色权限进行表达式评估
  • TRUNCATE 命令无效

表达式限制

行过滤器的 WHERE 子句有严格的限制:

允许的表达式禁止的表达式
简单比较运算符 (=, >, <, >=, <=)用户定义函数
逻辑运算符 (AND, OR, NOT)自定义运算符和类型
内置不可变函数系统列引用
常量值和列引用可变内置函数(如 NOW()

副本标识约束

如果发布包含 UPDATE 或 DELETE 操作,行过滤器必须只包含副本标识(REPLICA IDENTITY)涵盖的列。

sql
-- 设置副本标识
ALTER TABLE orders REPLICA IDENTITY USING INDEX orders_pkey;

-- 正确:使用主键列进行过滤
CREATE PUBLICATION orders_pub
FOR TABLE orders WHERE (order_id > 1000);

-- 错误:使用非副本标识列
CREATE PUBLICATION orders_pub
FOR TABLE orders WHERE (description LIKE '%urgent%'); -- 如果description不在副本标识中

UPDATE 转换机制

转换规则矩阵

UPDATE 操作的处理取决于旧行和新行是否满足行过滤器条件:

旧行状态新行状态转换结果说明
不匹配不匹配不复制两个状态都不满足条件
不匹配匹配INSERT新行需要添加到订阅者
匹配不匹配DELETE旧行需要从订阅者删除
匹配匹配UPDATE正常的 UPDATE 操作

实际示例演示

sql
-- 创建测试表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    category VARCHAR(50),
    price DECIMAL(10,2),
    status VARCHAR(20)
);

-- 创建带过滤器的发布
CREATE PUBLICATION active_products_pub
FOR TABLE products WHERE (status = 'active' AND price > 100);

-- 插入测试数据
INSERT INTO products VALUES
(1, 'electronics', 150.00, 'active'),
(2, 'books', 50.00, 'active'),
(3, 'electronics', 200.00, 'inactive');
sql
-- 创建相同结构的表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    category VARCHAR(50),
    price DECIMAL(10,2),
    status VARCHAR(20)
);

-- 创建订阅
CREATE SUBSCRIPTION products_sub
CONNECTION 'host=publisher dbname=ecommerce'
PUBLICATION active_products_pub;

UPDATE 转换示例分析

sql
-- 初始状态:只有id=1的记录满足条件(status='active' AND price > 100)
-- 订阅者表中只有:(1, 'electronics', 150.00, 'active')

-- 场景1:不匹配 → 匹配 = INSERT
UPDATE products SET price = 120.00 WHERE id = 2;
-- 结果:在订阅者中INSERT新记录 (2, 'books', 120.00, 'active')

-- 场景2:匹配 → 不匹配 = DELETE
UPDATE products SET status = 'inactive' WHERE id = 1;
-- 结果:在订阅者中DELETE记录 id=1

-- 场景3:匹配 → 匹配 = UPDATE
UPDATE products SET price = 180.00 WHERE id = 1 AND status = 'active';
-- 结果:在订阅者中UPDATE记录价格为180.00

-- 场景4:不匹配 → 不匹配 = 不复制
UPDATE products SET category = 'furniture' WHERE id = 3;
-- 结果:无操作,因为记录不满足过滤条件

分区表处理

publish_via_partition_root 参数影响

分区表示例

sql
-- 创建分区表
CREATE TABLE sales_data (
    sale_id SERIAL,
    sale_date DATE,
    amount DECIMAL(10,2),
    region VARCHAR(20),
    PRIMARY KEY (sale_id, sale_date)
) PARTITION BY RANGE (sale_date);

-- 创建分区
CREATE TABLE sales_2023 PARTITION OF sales_data
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');

CREATE TABLE sales_2024 PARTITION OF sales_data
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
sql
-- 策略1:使用根表过滤器
CREATE PUBLICATION sales_root_pub
FOR TABLE sales_data WHERE (amount > 1000)
WITH (publish_via_partition_root = true);

-- 策略2:使用分区独立过滤器
CREATE PUBLICATION sales_partition_pub
FOR TABLE
    sales_2023 WHERE (amount > 500),
    sales_2024 WHERE (amount > 2000)
WITH (publish_via_partition_root = false);

分区过滤器对比

配置优势劣势适用场景
publish_via_partition_root = true统一管理,简化配置无法针对分区定制过滤条件统一的场景
publish_via_partition_root = false灵活的分区级控制配置复杂,维护成本高不同分区需要不同过滤条件

初始数据同步

同步机制

初始数据同步不考虑 `publish` 参数,可能会复制一些在正常 DML 操作中不会复制的行。

多发布合并示例

sql
-- 发布者端:创建多个发布
CREATE PUBLICATION pub_high_value
FOR TABLE orders WHERE (total_amount > 1000);

CREATE PUBLICATION pub_vip_customers
FOR TABLE orders WHERE (customer_type = 'VIP');

CREATE PUBLICATION pub_recent
FOR TABLE orders WHERE (order_date >= '2024-01-01');

INFO

合并逻辑如果订阅者订阅多个发布,行过滤器会进行 OR 运算。满足任一条件的行都会被复制。

sql
-- 订阅者端:订阅多个发布
CREATE SUBSCRIPTION multi_pub_sub
CONNECTION 'host=publisher dbname=ecommerce'
PUBLICATION pub_high_value, pub_vip_customers, pub_recent;

-- 实际复制条件:
-- (total_amount > 1000) OR (customer_type = 'VIP') OR (order_date >= '2024-01-01')

实际应用案例

案例 1:电商数据分发系统

业务需求:将订单数据按地区分发到不同的数据中心

sql
-- 发布者:主数据中心
CREATE TABLE orders (
    order_id BIGSERIAL PRIMARY KEY,
    user_id BIGINT,
    region VARCHAR(20),
    order_amount DECIMAL(12,2),
    order_status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 为不同地区创建发布
CREATE PUBLICATION asia_orders_pub
FOR TABLE orders WHERE (region IN ('china', 'japan', 'korea'));

CREATE PUBLICATION europe_orders_pub
FOR TABLE orders WHERE (region IN ('germany', 'france', 'uk'));

CREATE PUBLICATION americas_orders_pub
FOR TABLE orders WHERE (region IN ('usa', 'canada', 'mexico'));

订阅者配置

sql
CREATE SUBSCRIPTION asia_orders_sub
CONNECTION 'host=main-dc dbname=ecommerce user=repl_user'
PUBLICATION asia_orders_pub;
sql
CREATE SUBSCRIPTION europe_orders_sub
CONNECTION 'host=main-dc dbname=ecommerce user=repl_user'
PUBLICATION europe_orders_pub;

案例 2:数据生命周期管理

业务需求:将活跃用户数据复制到分析数据库,历史数据保留在主库

sql
-- 主库:用户活动表
CREATE TABLE user_activities (
    activity_id BIGSERIAL PRIMARY KEY,
    user_id BIGINT,
    activity_type VARCHAR(50),
    activity_data JSONB,
    last_active_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 只复制最近活跃的用户数据
CREATE PUBLICATION active_users_pub
FOR TABLE user_activities
WHERE (last_active_at >= NOW() - INTERVAL '30 days');

分析库配置

sql
-- 分析数据库
CREATE SUBSCRIPTION analytics_sub
CONNECTION 'host=production-db dbname=mainapp user=analytics_user'
PUBLICATION active_users_pub;

-- 定期清理旧数据的策略
CREATE OR REPLACE FUNCTION cleanup_old_activities()
RETURNS void AS $$
BEGIN
    DELETE FROM user_activities
    WHERE last_active_at < NOW() - INTERVAL '90 days';
END;
$$ LANGUAGE plpgsql;

-- 设置定时任务
SELECT cron.schedule('cleanup-old-activities', '0 2 * * *', 'SELECT cleanup_old_activities();');

性能优化建议

索引策略

为行过滤器涉及的列创建合适的索引,提升过滤性能。

sql
-- 为过滤条件创建索引
CREATE INDEX idx_orders_region_status
ON orders (region, order_status)
WHERE order_status IN ('active', 'processing');

-- 为时间范围过滤创建索引
CREATE INDEX idx_activities_active_time
ON user_activities (last_active_at)
WHERE last_active_at >= '2024-01-01';

监控和诊断

sql
-- 监控复制状态
SELECT
    subscription_name,
    pid,
    received_lsn,
    last_msg_send_time,
    last_msg_receipt_time,
    latest_end_lsn,
    latest_end_time
FROM pg_stat_subscription;

-- 检查复制延迟
SELECT
    client_addr,
    application_name,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    write_lag,
    flush_lag,
    replay_lag
FROM pg_stat_replication;

性能测试脚本

sql
-- 生成测试数据
INSERT INTO orders (user_id, region, order_amount, order_status)
SELECT
    generate_series(1, 100000) % 10000 + 1,
    (ARRAY['china', 'usa', 'germany', 'japan'])[ceil(random() * 4)],
    random() * 1000 + 10,
    (ARRAY['active', 'completed', 'cancelled'])[ceil(random() * 3)];
sql
-- 测试过滤器性能
EXPLAIN (ANALYZE, BUFFERS)
SELECT COUNT(*)
FROM orders
WHERE region = 'china' AND order_status = 'active';

-- 测试复制性能
SELECT
    schemaname,
    tablename,
    n_tup_ins,
    n_tup_upd,
    n_tup_del
FROM pg_stat_user_tables
WHERE tablename = 'orders';

故障排查指南

常见问题及解决方案

1. 行过滤器表达式错误

问题:创建发布时出现表达式错误

bash
ERROR: row filter expression contains mutable functions

解决方案

sql
-- 错误:使用了可变函数
CREATE PUBLICATION recent_orders_pub
FOR TABLE orders WHERE (created_at >= NOW() - INTERVAL '1 day');
sql
-- 正确:使用固定时间点
CREATE PUBLICATION recent_orders_pub
FOR TABLE orders WHERE (created_at >= '2024-01-01');

-- 或者使用应用层动态更新
CREATE OR REPLACE FUNCTION update_publication_filter()
RETURNS void AS $$
DECLARE
    cutoff_date DATE := CURRENT_DATE - INTERVAL '30 days';
BEGIN
    EXECUTE format('ALTER PUBLICATION recent_orders_pub SET TABLE orders WHERE (created_at >= %L)', cutoff_date);
END;
$$ LANGUAGE plpgsql;

2. 副本标识冲突

问题:UPDATE/DELETE 操作中使用了非副本标识列

bash
ERROR: row filter for relation "orders" contains non-replica identity columns

解决方案

sql
-- 检查当前副本标识
SELECT
    schemaname,
    tablename,
    attname,
    format_type(atttypid, atttypmod) as data_type
FROM pg_attribute a
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relreplident != 'd'
    AND n.nspname = 'public'
    AND c.relname = 'orders';

-- 修改副本标识以包含需要的列
CREATE UNIQUE INDEX orders_replica_idx ON orders (order_id, region);
ALTER TABLE orders REPLICA IDENTITY USING INDEX orders_replica_idx;

3. 初始同步数据不一致

问题:初始同步的数据与预期不符

Details

诊断步骤

  1. 检查发布定义
  2. 验证过滤条件
  3. 确认订阅状态
  4. 手动验证数据一致性
sql
-- 检查发布配置
SELECT
    pubname,
    pubowner,
    puballtables,
    pubinsert,
    pubupdate,
    pubdelete,
    pubtruncate,
    pubviaroot
FROM pg_publication
WHERE pubname = 'your_publication_name';

-- 检查表级过滤器
SELECT
    schemaname,
    tablename,
    pubname,
    rowfilter
FROM pg_publication_tables
WHERE pubname = 'your_publication_name';

-- 手动验证数据一致性
SELECT
    'publisher' as source,
    COUNT(*) as row_count,
    SUM(order_amount) as total_amount
FROM orders
WHERE region = 'china' AND order_status = 'active'

UNION ALL

SELECT
    'subscriber' as source,
    COUNT(*) as row_count,
    SUM(order_amount) as total_amount
FROM orders
WHERE region = 'china' AND order_status = 'active';

最佳实践总结

设计原则

  1. 明确过滤需求:在设计阶段就确定数据分发策略
  2. 简化过滤条件:避免复杂的表达式,优先使用索引友好的条件
  3. 考虑数据一致性:确保过滤逻辑不会导致数据关系破坏
  4. 规划副本标识:为涉及 UPDATE/DELETE 的表合理设置副本标识

运维建议

  1. 监控复制延迟:定期检查复制状态和延迟情况
  2. 测试故障恢复:定期演练订阅重建和数据重同步
  3. 版本兼容性:注意 PostgreSQL 版本间的行过滤器兼容性
  4. 文档维护:及时更新发布订阅的配置文档

WARNING

注意事项

  • PostgreSQL 15 以下版本在初始数据同步时不支持行过滤器
  • 行过滤器不影响 TRUNCATE 操作
  • 多个发布的过滤器会进行 OR 运算合并
  • 表达式中不能使用用户定义的函数或可变函数

通过合理使用 PostgreSQL 的逻辑复制行过滤器,可以构建灵活、高效的数据分发系统,满足现代应用对数据隔离、安全性和性能的需求。