Skip to content

PostgreSQL 合并支持函数

概述

PostgreSQL 提供了专门的合并支持函数,用于在 MERGE 命令执行过程中跟踪和识别每一行数据所执行的具体操作。这个功能对于数据同步、审计和调试场景非常有用。

核心函数详解

merge_action() 函数

merge_action() 是 PostgreSQL 中唯一的合并支持函数,专门用于标识 MERGE 操作中每行数据的执行动作。

函数签名:

sql
merge_action() → text

返回值:

  • 'INSERT' - 表示该行执行了插入操作
  • 'UPDATE' - 表示该行执行了更新操作
  • 'DELETE' - 表示该行执行了删除操作

使用场景与限制

WARNING

重要限制 merge_action() 函数仅能在 MERGE 命令的 RETURNING 子句中使用。在查询的任何其他部分使用都会导致错误。

适用场景

  1. 数据同步追踪 - 监控数据同步过程中的操作类型
  2. 审计日志 - 记录数据变更的具体操作
  3. 性能分析 - 统计不同操作的执行次数
  4. 调试诊断 - 验证 MERGE 逻辑是否按预期执行

实战示例

示例 1:产品库存同步

让我们通过一个电商系统的产品库存同步场景来理解 merge_action() 的实际应用。

问题陈述: 需要将库存表(stock)的数据同步到产品表(products),并跟踪每个产品执行了什么操作。

数据准备:

sql
-- 产品表
CREATE TABLE products (
    product_id INT PRIMARY KEY,
    in_stock BOOLEAN DEFAULT false,
    quantity INT DEFAULT 0
);

-- 库存表
CREATE TABLE stock (
    product_id INT PRIMARY KEY,
    quantity INT
);
sql
-- 产品表初始数据
INSERT INTO products VALUES
(1001, true, 30),   -- 现有产品,有库存
(1002, true, 5);    -- 现有产品,少量库存

-- 库存表新数据
INSERT INTO stock VALUES
(1001, 50),  -- 更新现有产品库存
(1002, 0),   -- 现有产品库存清零
(1003, 10);  -- 新产品

解决方案:

sql
MERGE INTO products p
USING stock s ON p.product_id = s.product_id
WHEN MATCHED AND s.quantity > 0 THEN
    UPDATE SET in_stock = true, quantity = s.quantity
WHEN MATCHED THEN
    UPDATE SET in_stock = false, quantity = 0
WHEN NOT MATCHED THEN
    INSERT (product_id, in_stock, quantity)
    VALUES (s.product_id, true, s.quantity)
RETURNING merge_action(), p.*;

输出结果:

merge_actionproduct_idin_stockquantity
UPDATE1001t50
UPDATE1002f0
INSERT1003t10

分析过程:

  1. 产品 1001:执行 UPDATE 操作,因为 s.quantity > 0(50 > 0),所以设置 in_stock = truequantity = 50
  2. 产品 1002:执行 UPDATE 操作,因为 s.quantity = 0,所以设置 in_stock = falsequantity = 0
  3. 产品 1003:执行 INSERT 操作,因为在产品表中不存在,所以插入新记录

示例 2:用户信息同步与统计

问题陈述: 从外部系统同步用户信息,并统计各种操作的执行次数。

数据准备:

sql
-- 用户表
CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    last_login TIMESTAMP,
    status VARCHAR(10) DEFAULT 'active'
);

-- 外部用户数据
CREATE TABLE external_users (
    user_id INT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    last_login TIMESTAMP
);

解决方案:

sql
-- 执行同步并收集操作统计
WITH merge_results AS (
    MERGE INTO users u
    USING external_users eu ON u.user_id = eu.user_id
    WHEN MATCHED THEN
        UPDATE SET
            username = eu.username,
            email = eu.email,
            last_login = eu.last_login
    WHEN NOT MATCHED THEN
        INSERT (user_id, username, email, last_login)
        VALUES (eu.user_id, eu.username, eu.email, eu.last_login)
    RETURNING merge_action()
)
SELECT
    merge_action() AS operation,
    COUNT(*) AS count
FROM merge_results
GROUP BY merge_action()
ORDER BY operation;

示例 3:带删除操作的完整同步

问题陈述: 实现完整的数据同步,包括删除在源表中不存在的记录。

sql
-- 完整同步示例(包含DELETE操作)
MERGE INTO products p
USING (
    SELECT product_id, quantity FROM stock
    UNION ALL
    SELECT product_id, NULL as quantity
    FROM products
    WHERE product_id NOT IN (SELECT product_id FROM stock)
) s ON p.product_id = s.product_id
WHEN MATCHED AND s.quantity IS NOT NULL THEN
    UPDATE SET quantity = s.quantity, in_stock = (s.quantity > 0)
WHEN MATCHED AND s.quantity IS NULL THEN
    DELETE
WHEN NOT MATCHED THEN
    INSERT (product_id, quantity, in_stock)
    VALUES (s.product_id, s.quantity, s.quantity > 0)
RETURNING
    merge_action(),
    CASE
        WHEN merge_action() = 'DELETE' THEN 'Removed: ' || OLD.product_id
        ELSE 'Processed: ' || NEW.product_id
    END AS description;

性能优化建议

1. 减少 RETURNING 子句的数据量

sql
-- 只返回必要的信息
RETURNING merge_action(), product_id;

-- 而不是
RETURNING merge_action(), *;

2. 结合 CTE 进行后续处理

sql
WITH merge_operations AS (
    MERGE INTO products p
    USING stock s ON p.product_id = s.product_id
    -- ... MERGE 逻辑 ...
    RETURNING merge_action(), product_id, quantity
)
-- 基于操作类型进行不同的后续处理
SELECT
    merge_action(),
    COUNT(*) as operation_count,
    SUM(quantity) as total_quantity
FROM merge_operations
GROUP BY merge_action();

错误处理与调试

常见错误

DANGER

错误示例

sql
-- ❌ 错误:不能在WHERE子句中使用
SELECT * FROM products
WHERE merge_action() = 'INSERT';

-- ❌ 错误:不能在普通SELECT中使用
SELECT merge_action() FROM products;

TIP

正确用法

sql
-- ✅ 正确:只在MERGE的RETURNING中使用
MERGE INTO products p
USING stock s ON p.product_id = s.product_id
-- ... MERGE 逻辑 ...
RETURNING merge_action(), p.product_id;

调试技巧

sql
-- 使用详细的RETURNING子句进行调试
MERGE INTO products p
USING stock s ON p.product_id = s.product_id
WHEN MATCHED AND s.quantity > 0 THEN
    UPDATE SET in_stock = true, quantity = s.quantity
WHEN MATCHED THEN
    UPDATE SET in_stock = false, quantity = 0
WHEN NOT MATCHED THEN
    INSERT (product_id, in_stock, quantity)
    VALUES (s.product_id, true, s.quantity)
RETURNING
    merge_action() AS action,
    p.product_id,
    s.quantity AS source_quantity,
    p.quantity AS target_quantity,
    CASE
        WHEN merge_action() = 'UPDATE' THEN 'Modified existing'
        WHEN merge_action() = 'INSERT' THEN 'Added new'
        WHEN merge_action() = 'DELETE' THEN 'Removed'
    END AS description;

最佳实践

1. 结合事务使用

sql
BEGIN;

-- 记录同步开始
INSERT INTO sync_log (operation, start_time)
VALUES ('product_sync', NOW());

-- 执行同步
WITH sync_results AS (
    MERGE INTO products p
    USING stock s ON p.product_id = s.product_id
    -- ... MERGE 逻辑 ...
    RETURNING merge_action(), product_id
)
-- 更新同步日志
UPDATE sync_log
SET
    end_time = NOW(),
    records_processed = (SELECT COUNT(*) FROM sync_results),
    records_inserted = (SELECT COUNT(*) FROM sync_results WHERE merge_action() = 'INSERT'),
    records_updated = (SELECT COUNT(*) FROM sync_results WHERE merge_action() = 'UPDATE')
WHERE operation = 'product_sync'
  AND end_time IS NULL;

COMMIT;

2. 数据质量监控

sql
-- 监控异常的同步模式
WITH sync_analysis AS (
    MERGE INTO products p
    USING stock s ON p.product_id = s.product_id
    -- ... MERGE 逻辑 ...
    RETURNING merge_action(), product_id, quantity
)
SELECT
    CASE
        WHEN insert_ratio > 0.5 THEN 'High insert ratio detected'
        WHEN update_ratio > 0.8 THEN 'Mostly updates - consider optimization'
        ELSE 'Normal sync pattern'
    END AS analysis
FROM (
    SELECT
        COUNT(CASE WHEN merge_action() = 'INSERT' THEN 1 END)::FLOAT / COUNT(*) AS insert_ratio,
        COUNT(CASE WHEN merge_action() = 'UPDATE' THEN 1 END)::FLOAT / COUNT(*) AS update_ratio
    FROM sync_analysis
) stats;

进阶应用

流程图:MERGE 操作执行流程

与触发器结合使用

sql
-- 创建审计表
CREATE TABLE merge_audit (
    id SERIAL PRIMARY KEY,
    table_name VARCHAR(50),
    operation VARCHAR(10),
    record_id INT,
    timestamp TIMESTAMP DEFAULT NOW()
);

-- 在MERGE中结合审计
WITH merge_ops AS (
    MERGE INTO products p
    USING stock s ON p.product_id = s.product_id
    -- ... MERGE 逻辑 ...
    RETURNING merge_action(), product_id
)
INSERT INTO merge_audit (table_name, operation, record_id)
SELECT 'products', merge_action(), product_id
FROM merge_ops;

通过深入理解和灵活运用 merge_action() 函数,可以大大提升数据同步操作的可观测性和可维护性,为复杂的数据处理场景提供强有力的支持。