Skip to content

PostgreSQL 逻辑复制列列表

概述

逻辑复制列列表是 PostgreSQL 提供的一项强大功能,允许发布者选择性地复制表中的特定列到订阅者,而不是复制整个表。这种精细化的控制可以显著提升复制性能,减少网络传输和存储开销。

核心概念

什么是列列表

列列表是在创建发布时指定的列名集合,用于确定哪些列将被复制到订阅者。通过列列表,可以实现:

  • 选择性复制:只复制业务需要的列
  • 性能优化:减少网络传输和存储空间
  • 数据隔离:在一定程度上控制数据访问范围

INFO

列列表功能从 PostgreSQL 15 版本开始全面支持,包括初始数据同步阶段。

架构图解

使用场景

1. 数据仓库同步

在企业数据仓库场景中,通常只需要同步核心业务字段,而不需要同步所有的审计字段或临时字段。

问题陈述:电商系统的订单表包含大量字段,但数据分析系统只需要核心的订单信息。

解决方案

sql
-- 发布者端:订单表包含完整信息
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    order_date TIMESTAMP NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    -- 以下字段不需要同步到数据仓库
    internal_notes TEXT,
    created_by VARCHAR(50),
    updated_by VARCHAR(50),
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 创建发布,只同步核心业务字段
CREATE PUBLICATION analytics_pub
FOR TABLE orders (order_id, customer_id, order_date, total_amount, status);
sql
-- 订阅者端:数据仓库只需要核心字段
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    order_date TIMESTAMP NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

CREATE SUBSCRIPTION analytics_sub
CONNECTION 'host=prod-db port=5432 dbname=ecommerce user=repl_user'
PUBLICATION analytics_pub;

分析过程

  • 原表包含 10 个字段,但分析系统只需要 5 个核心字段
  • 通过列列表配置,网络传输量减少约 50%
  • 订阅者存储空间大幅节省
  • 敏感的内部备注和操作员信息不会传输到分析系统

2. 微服务数据同步

在微服务架构中,不同服务可能只需要主表的部分字段。

问题陈述:用户服务需要同步产品的基本信息,但不需要供应商和库存等敏感信息。

解决方案

sql
-- 产品主表
CREATE TABLE products (
    product_id BIGINT PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    description TEXT,
    price DECIMAL(10,2) NOT NULL,
    category_id INT NOT NULL,
    -- 敏感信息,不同步到用户服务
    cost_price DECIMAL(10,2),
    supplier_id BIGINT,
    stock_quantity INT,
    internal_code VARCHAR(50)
);

-- 为用户服务创建发布
CREATE PUBLICATION user_service_pub
FOR TABLE products (product_id, name, description, price, category_id);

语法详解

基本语法

sql
CREATE PUBLICATION publication_name
FOR TABLE table_name (column1, column2, column3, ...);

完整示例

sql
-- 创建测试表
CREATE TABLE customer_data (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(150) UNIQUE NOT NULL,
    phone VARCHAR(20),
    address TEXT,
    -- 敏感信息
    credit_score INT,
    internal_notes TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 插入测试数据
INSERT INTO customer_data (name, email, phone, address, credit_score, internal_notes) VALUES
('张三', '[email protected]', '13800138000', '北京市朝阳区', 750, '优质客户'),
('李四', '[email protected]', '13900139000', '上海市浦东区', 680, '正常客户'),
('王五', '[email protected]', '13700137000', '广州市天河区', 720, '潜在VIP');

-- 创建发布,只包含公开信息
CREATE PUBLICATION customer_basic_pub
FOR TABLE customer_data (id, name, email, phone, address);

输入数据

sql
SELECT * FROM customer_data ORDER BY id;

输出结果

 id | name | email                | phone       | address      | credit_score | internal_notes | created_at
----+------+----------------------+-------------+--------------+--------------+----------------+------------
  1 | 张三 | [email protected] | 13800138000 | 北京市朝阳区 |          750 | 优质客户       | 2024-01-15...
  2 | 李四 | [email protected]     | 13900139000 | 上海市浦东区 |          680 | 正常客户       | 2024-01-15...
  3 | 王五 | [email protected]   | 13700137000 | 广州市天河区 |          720 | 潜在VIP        | 2024-01-15...

订阅者接收到的数据

sql
-- 订阅者端只会收到指定的列
SELECT * FROM customer_data ORDER BY id;
 id | name | email                | phone       | address
----+------+----------------------+-------------+--------------
  1 | 张三 | [email protected] | 13800138000 | 北京市朝阳区
  2 | 李四 | [email protected]     | 13900139000 | 上海市浦东区
  3 | 王五 | [email protected]   | 13700137000 | 广州市天河区

重要限制和注意事项

1. 副本标识要求

如果发布包含 UPDATE 或 DELETE 操作,列列表必须包含表的副本标识列。

sql
-- 查看表的副本标识
SELECT schemaname, tablename, attname, attnum
FROM pg_publication_tables pt
JOIN pg_attribute pa ON pa.attrelid = pt.schemaname::regnamespace::oid
WHERE pt.pubname = 'your_publication'
AND pa.attname IN (
    SELECT a.attname
    FROM pg_index i
    JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
    WHERE i.indrelid = 'your_table'::regclass AND i.indisreplident
);

错误示例

sql
-- 错误:主键列未包含在列列表中
CREATE PUBLICATION bad_pub
FOR TABLE orders (customer_id, order_date, total_amount);  -- 缺少 order_id (主键)

正确示例

sql
-- 正确:包含主键列
CREATE PUBLICATION good_pub
FOR TABLE orders (order_id, customer_id, order_date, total_amount);

2. 模式发布限制

当使用 `FOR TABLES IN SCHEMA` 时,不支持指定列列表。

sql
-- 不支持的语法
CREATE PUBLICATION schema_pub
FOR TABLES IN SCHEMA public (col1, col2);  -- 错误!

-- 正确的方式:分别为每个表指定
CREATE PUBLICATION schema_pub
FOR TABLE table1 (col1, col2),
         table2 (col3, col4);

3. 多发布冲突

DANGER

当同一个表在多个发布中使用不同的列列表时,会导致订阅失败。

问题场景

sql
-- 发布1
CREATE PUBLICATION pub1 FOR TABLE users (id, name, email);

-- 发布2(与pub1冲突)
CREATE PUBLICATION pub2 FOR TABLE users (id, name, phone);

-- 创建包含两个发布的订阅会失败
CREATE SUBSCRIPTION multi_sub
CONNECTION '...'
PUBLICATION pub1, pub2;  -- 错误!

解决方案

sql
-- 方案1:统一列列表
ALTER PUBLICATION pub2 SET TABLE users (id, name, email);

-- 方案2:移除其中一个发布
ALTER SUBSCRIPTION multi_sub DROP PUBLICATION pub2;

分区表处理

publish_via_partition_root 参数

分区表的列列表行为由 publish_via_partition_root 参数控制:

示例配置

sql
-- 创建分区表
CREATE TABLE sales (
    id BIGINT,
    sale_date DATE NOT NULL,
    amount DECIMAL(10,2),
    region VARCHAR(50),
    details TEXT
) PARTITION BY RANGE (sale_date);

-- 创建分区
CREATE TABLE sales_2024_q1 PARTITION OF sales
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE sales_2024_q2 PARTITION OF sales
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- 方式1:通过根分区发布(推荐)
CREATE PUBLICATION sales_pub
FOR TABLE sales (id, sale_date, amount, region)
WITH (publish_via_partition_root = true);

-- 方式2:为每个分区单独配置
CREATE PUBLICATION sales_partition_pub
FOR TABLE sales_2024_q1 (id, sale_date, amount),
         sales_2024_q2 (id, sale_date, amount, region)
WITH (publish_via_partition_root = false);

性能优化

1. 网络传输优化

通过选择性复制可以显著减少网络传输量:

场景原始大小优化后大小节省比例
用户表(去除大文本字段)2KB/行500B/行75%
订单表(去除审计字段)1.5KB/行800B/行47%
产品表(去除内部字段)3KB/行1.2KB/行60%

2. 存储空间优化

分析过程

sql
-- 查看列列表发布的存储占用
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as table_size,
    array_length(string_to_array(columns, ','), 1) as published_columns,
    (SELECT count(*) FROM information_schema.columns
     WHERE table_schema = pt.schemaname AND table_name = pt.tablename) as total_columns
FROM pg_publication_tables pt
WHERE pt.pubname = 'your_publication';

监控和维护

1. 查看发布配置

sql
-- 查看所有发布和其列列表
SELECT
    p.pubname,
    pt.schemaname,
    pt.tablename,
    pt.rowfilter,
    pt.columns
FROM pg_publication p
LEFT JOIN pg_publication_tables pt ON p.pubname = pt.pubname
ORDER BY p.pubname, pt.schemaname, pt.tablename;

2. 监控复制状态

sql
-- 监控复制延迟和状态
SELECT
    s.subname,
    s.subenabled,
    sr.pid,
    sr.received_lsn,
    sr.last_msg_send_time,
    sr.last_msg_receipt_time,
    sr.latest_end_lsn,
    sr.latest_end_time
FROM pg_subscription s
LEFT JOIN pg_stat_subscription sr ON s.oid = sr.subid;

3. 故障排查

常见问题和解决方案:

sql
-- 查找可能的列列表冲突
WITH pub_tables AS (
    SELECT pubname, schemaname, tablename, columns
    FROM pg_publication_tables
    WHERE columns IS NOT NULL
)
SELECT
    schemaname,
    tablename,
    array_agg(pubname) as publications,
    array_agg(columns) as column_lists
FROM pub_tables
GROUP BY schemaname, tablename
HAVING count(*) > 1;
sql
-- 验证副本标识列是否包含在列列表中
SELECT
    n.nspname as schema_name,
    c.relname as table_name,
    a.attname as replica_identity_column,
    pt.columns as published_columns,
    CASE
        WHEN pt.columns ~ a.attname THEN '✓ 包含'
        ELSE '✗ 缺失'
    END as status
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN pg_index i ON c.oid = i.indrelid AND i.indisreplident
JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = ANY(i.indkey)
LEFT JOIN pg_publication_tables pt ON pt.schemaname = n.nspname
    AND pt.tablename = c.relname
WHERE pt.columns IS NOT NULL;

最佳实践

1. 设计原则

> **最小权限原则**:只复制业务真正需要的列,避免过度暴露数据。

sql
-- 好的做法:明确业务需求
CREATE PUBLICATION customer_service_pub
FOR TABLE customers (customer_id, name, email, phone, address);

-- 避免的做法:复制所有列
CREATE PUBLICATION customer_all_pub FOR TABLE customers;  -- 不推荐

2. 命名规范

建议采用描述性的命名规范:

sql
-- 推荐的命名方式
CREATE PUBLICATION analytics_orders_basic_pub
FOR TABLE orders (order_id, customer_id, order_date, total_amount);

CREATE PUBLICATION crm_customers_contact_pub
FOR TABLE customers (customer_id, name, email, phone);

-- 避免的命名方式
CREATE PUBLICATION pub1 FOR TABLE orders (...);  -- 名称不明确
CREATE PUBLICATION orders_pub FOR TABLE orders (...);  -- 缺少用途说明

3. 版本兼容性

WARNING

在 PostgreSQL 15 之前的版本中,初始数据同步会忽略列列表,复制所有列。升级时需要注意这一行为变化。

sql
-- 检查 PostgreSQL 版本
SELECT version();

-- 如果需要兼容旧版本,可以在订阅者端处理
CREATE VIEW customer_basic_view AS
SELECT customer_id, name, email, phone
FROM customers;

4. 安全考虑

不要将列列表作为安全边界。恶意订阅者可能通过其他方式获取未发布的列数据。

安全最佳实践

sql
-- 在发布者端应用安全策略
CREATE POLICY customer_data_policy ON customers
FOR SELECT TO replication_role
USING (is_public_data = true);

-- 结合列列表使用
CREATE PUBLICATION secure_customer_pub
FOR TABLE customers (customer_id, name, email)
WHERE (is_public_data = true);

总结

PostgreSQL 逻辑复制的列列表功能为数据复制提供了精细化的控制能力。通过合理使用此功能,可以:

  • 提升性能:减少网络传输和存储开销
  • 增强安全:限制敏感数据的传播范围
  • 简化维护:减少订阅者端的数据处理复杂度

在实际应用中,需要仔细设计列列表配置,确保包含必要的副本标识列,避免多发布冲突,并结合业务需求制定合适的数据同步策略。