Skip to content

PostgreSQL 逻辑复制监控完全指南

概述

PostgreSQL 逻辑复制监控是确保数据同步稳定性和性能的关键环节。由于逻辑复制基于与物理流复制类似的架构,其监控方法既有相似性,又有独特的特点。本指南将详细介绍如何有效监控逻辑复制系统。

逻辑复制监控架构

核心监控视图

pg_stat_subscription 视图详解

pg_stat_subscription 是监控订阅节点的核心视图,提供每个订阅工作进程的详细信息。

INFO

每个订阅可以有零个或多个活动工作进程,这取决于订阅状态、表同步状态和并行配置。

视图结构分析

字段名数据类型描述业务含义
subidoid订阅的 OID唯一标识订阅
subnamename订阅名称便于识别的订阅名
pidinteger工作进程 ID进程监控标识
relidoid表的 OID正在同步的表
received_lsnpg_lsn接收的 LSN 位置数据接收进度
last_msg_send_timetimestamp最后消息发送时间连接活跃度
last_msg_receipt_timetimestamp最后消息接收时间网络延迟监控
latest_end_lsnpg_lsn最新结束 LSN应用进度
latest_end_timetimestamp最新结束时间应用时效性

基础监控查询

1. 查看所有订阅状态

sql
-- 查看订阅基本状态信息
SELECT
    subname AS "订阅名",
    pid AS "进程ID",
    CASE
        WHEN pid IS NULL THEN '未运行'
        ELSE '运行中'
    END AS "状态",
    received_lsn AS "接收位置",
    latest_end_lsn AS "应用位置",
    last_msg_receipt_time AS "最后接收时间"
FROM pg_stat_subscription
ORDER BY subname;

示例输出:

 订阅名        | 进程ID | 状态   | 接收位置      | 应用位置      | 最后接收时间
--------------|--------|--------|---------------|---------------|------------------
 product_sync | 15234  | 运行中 | 0/3A2B4C8D   | 0/3A2B4C8D   | 2024-01-15 14:30:25
 order_sync   | 15567  | 运行中 | 0/3A2B5F12   | 0/3A2B5F10   | 2024-01-15 14:30:23

2. 监控复制延迟

sql
-- 计算复制延迟
SELECT
    subname AS "订阅名",
    EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) AS "延迟秒数",
    pg_size_pretty(
        pg_wal_lsn_diff(received_lsn, latest_end_lsn)
    ) AS "待应用数据量",
    CASE
        WHEN EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) > 60
        THEN '⚠️ 延迟警告'
        ELSE '✅ 正常'
    END AS "状态评估"
FROM pg_stat_subscription
WHERE pid IS NOT NULL;

示例输出:

 订阅名        | 延迟秒数 | 待应用数据量 | 状态评估
--------------|----------|--------------|----------
 product_sync | 2.5      | 0 bytes      | ✅ 正常
 order_sync   | 65.2     | 2048 bytes   | ⚠️ 延迟警告

不同场景下的工作进程监控

1. 正常运行状态

在正常情况下,每个启用的订阅会有一个应用进程:

sql
-- 查看正常运行的订阅进程
SELECT
    s.subname,
    s.pid,
    a.application_name,
    a.state,
    a.sync_state
FROM pg_stat_subscription s
JOIN pg_stat_activity a ON s.pid = a.pid
WHERE s.pid IS NOT NULL;

2. 初始数据同步状态

当表进行初始数据同步时,会有额外的同步工作进程:

sql
-- 监控初始同步进度
SELECT
    s.subname AS "订阅名",
    t.schemaname || '.' || t.tablename AS "表名",
    sr.srsubstate AS "同步状态",
    CASE sr.srsubstate
        WHEN 'i' THEN '初始化'
        WHEN 'd' THEN '数据正在复制'
        WHEN 's' THEN '同步完成'
        WHEN 'r' THEN '准备就绪'
    END AS "状态说明"
FROM pg_subscription s
JOIN pg_subscription_rel sr ON s.oid = sr.srsubid
JOIN pg_tables t ON sr.srrelid = t.schemaname||'.'||t.tablename::regclass
ORDER BY s.subname, t.tablename;

3. 并行应用状态

当启用流式事务并行应用时,可能存在多个并行工作进程:

sql
-- 查看并行应用进程
SELECT
    subname,
    COUNT(*) AS "进程数量",
    string_agg(pid::text, ', ') AS "进程ID列表"
FROM pg_stat_subscription
WHERE pid IS NOT NULL
GROUP BY subname
HAVING COUNT(*) > 1;

发布端监控

pg_stat_replication 视图

发布端的监控类似于物理复制主节点的监控:

sql
-- 监控发布端复制状态
SELECT
    application_name AS "订阅者",
    client_addr AS "客户端地址",
    state AS "状态",
    sent_lsn AS "发送位置",
    write_lsn AS "写入位置",
    flush_lsn AS "刷新位置",
    replay_lsn AS "应用位置",
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)) AS "延迟大小"
FROM pg_stat_replication
WHERE application_name LIKE '%subscription%';

复制槽监控

sql
-- 监控逻辑复制槽状态
SELECT
    slot_name AS "槽名",
    plugin AS "插件",
    slot_type AS "槽类型",
    database AS "数据库",
    active AS "是否活跃",
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS "WAL保留大小",
    restart_lsn AS "重启LSN"
FROM pg_replication_slots
WHERE slot_type = 'logical';

监控告警设置

1. 延迟监控脚本

bash
#!/bin/bash
# logical_replication_monitor.sh

# 配置参数
DELAY_THRESHOLD=300  # 5分钟延迟告警阈值
LAG_THRESHOLD="100MB"  # 100MB数据延迟告警阈值

# 检查订阅延迟
psql -d target_db -t -c "
SELECT
    subname,
    EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) as delay_seconds,
    pg_wal_lsn_diff(received_lsn, latest_end_lsn) as lag_bytes
FROM pg_stat_subscription
WHERE pid IS NOT NULL
" | while read subname delay_seconds lag_bytes; do

    if (( $(echo "$delay_seconds > $DELAY_THRESHOLD" | bc -l) )); then
        echo "ALERT: 订阅 $subname 延迟 ${delay_seconds}s,超过阈值 ${DELAY_THRESHOLD}s"
    fi

    lag_mb=$(echo "scale=2; $lag_bytes / 1024 / 1024" | bc)
    if (( $(echo "$lag_mb > 100" | bc -l) )); then
        echo "ALERT: 订阅 $subname 数据延迟 ${lag_mb}MB,超过阈值"
    fi
done
sql
-- 创建告警视图
CREATE OR REPLACE VIEW v_subscription_alerts AS
SELECT
    subname,
    CASE
        WHEN pid IS NULL THEN 'CRITICAL: 订阅进程未运行'
        WHEN EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)) > 300
        THEN 'WARNING: 延迟超过5分钟'
        WHEN pg_wal_lsn_diff(received_lsn, latest_end_lsn) > 104857600
        THEN 'WARNING: 数据延迟超过100MB'
        ELSE 'OK'
    END AS alert_level,
    last_msg_receipt_time,
    pg_size_pretty(pg_wal_lsn_diff(received_lsn, latest_end_lsn)) AS lag_size
FROM pg_stat_subscription;

-- 查询告警
SELECT * FROM v_subscription_alerts WHERE alert_level != 'OK';

2. Prometheus 监控指标

sql
-- 导出Prometheus指标的查询
SELECT
    'postgresql_subscription_lag_seconds{subscription="' || subname || '"} ' ||
    COALESCE(EXTRACT(EPOCH FROM (now() - last_msg_receipt_time)), 0) AS metric
FROM pg_stat_subscription
UNION ALL
SELECT
    'postgresql_subscription_lag_bytes{subscription="' || subname || '"} ' ||
    COALESCE(pg_wal_lsn_diff(received_lsn, latest_end_lsn), 0) AS metric
FROM pg_stat_subscription;

性能优化监控

1. 网络性能监控

sql
-- 监控网络传输性能
SELECT
    application_name,
    client_addr,
    pg_size_pretty(
        pg_wal_lsn_diff(sent_lsn, write_lsn)
    ) AS "网络缓冲区大小",
    EXTRACT(EPOCH FROM (now() - last_message_time)) AS "最后活动间隔秒"
FROM pg_stat_replication
WHERE application_name LIKE '%subscription%';

2. 应用性能监控

sql
-- 监控应用进程性能
SELECT
    s.subname,
    a.query_start,
    a.state_change,
    a.query AS "当前执行SQL",
    EXTRACT(EPOCH FROM (now() - a.query_start)) AS "执行时长秒"
FROM pg_stat_subscription s
JOIN pg_stat_activity a ON s.pid = a.pid
WHERE s.pid IS NOT NULL
ORDER BY "执行时长秒" DESC;

故障排查指南

常见问题诊断

1. 订阅进程未运行

当 `pg_stat_subscription` 中某个订阅的 `pid` 为 NULL 时,表示订阅进程未运行。

诊断步骤:

sql
-- 1. 检查订阅状态
SELECT subname, subenabled, subconninfo
FROM pg_subscription
WHERE subname = 'your_subscription_name';

-- 2. 检查错误日志
SELECT * FROM pg_stat_activity
WHERE application_name LIKE '%subscription%';

-- 3. 手动启用订阅
ALTER SUBSCRIPTION your_subscription_name ENABLE;

2. 复制延迟过高

问题分析流程:

具体排查查询:

sql
-- 检查是否有长时间运行的事务
SELECT
    pid,
    now() - pg_stat_activity.query_start AS duration,
    query
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
AND application_name LIKE '%subscription%';

-- 检查锁等待情况
SELECT
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity
    ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
    AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
    AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
    AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
    AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
    AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
    AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity
    ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

3. 初始同步卡住

sql
-- 检查初始同步状态
SELECT
    s.subname,
    sr.srrelid::regclass AS table_name,
    sr.srsubstate,
    CASE sr.srsubstate
        WHEN 'i' THEN '初始化中'
        WHEN 'd' THEN '数据复制中'
        WHEN 's' THEN '同步完成'
        WHEN 'r' THEN '准备完毕'
    END AS sync_status
FROM pg_subscription s
JOIN pg_subscription_rel sr ON s.oid = sr.srsubid
WHERE sr.srsubstate IN ('i', 'd')
ORDER BY s.subname;

-- 检查同步进程活动
SELECT
    pid,
    application_name,
    state,
    query_start,
    now() - query_start AS duration
FROM pg_stat_activity
WHERE application_name LIKE '%tablesync%'
ORDER BY duration DESC;

最佳实践建议

1. 监控频率设置

TIP

  • 实时监控: 每 30 秒检查一次订阅状态
  • 延迟告警: 延迟超过 5 分钟时发送告警
  • 性能分析: 每小时记录一次性能指标
  • 日报统计: 每日汇总复制情况

2. 告警阈值配置

指标告警阈值严重程度处理建议
进程状态进程停止Critical立即重启订阅
延迟时间>5 分钟Warning检查网络和性能
数据延迟>100MBWarning检查应用性能
WAL 保留>1GBWarning检查订阅端状态

3. 监控自动化

bash
# 定时任务配置 (crontab)
# 每分钟检查订阅状态
* * * * * /opt/scripts/check_subscription_status.sh

# 每5分钟检查延迟情况
*/5 * * * * /opt/scripts/check_replication_lag.sh

# 每小时生成性能报告
0 * * * * /opt/scripts/generate_performance_report.sh

总结

PostgreSQL 逻辑复制监控需要综合考虑发布端和订阅端的各项指标。通过合理使用 pg_stat_subscriptionpg_stat_replication 等系统视图,结合自动化监控脚本,可以及时发现和解决复制问题,确保数据同步的稳定性和可靠性。

IMPORTANT

监控不仅仅是观察数据,更重要的是建立完整的告警机制和应急响应流程,确保在问题发生时能够快速定位和解决。