Skip to content

PostgreSQL 并行查询安全性完全指南

概述

PostgreSQL 的并行查询功能可以显著提升查询性能,但并非所有操作都适合并行执行。理解并行查询的安全性分类对于编写高效且正确的 SQL 查询至关重要。本文将深入探讨 PostgreSQL 中的并行安全性概念,帮助您掌握如何正确使用并行查询功能。

并行安全性分类

PostgreSQL 将查询中的操作分为三个安全级别:

1. 并行安全(Parallel Safe)

定义:可以在并行工作进程中安全执行的操作,与并行查询完全兼容。

特点

  • ✅ 可以在领导进程执行
  • ✅ 可以在工作进程执行
  • ✅ 不会产生副作用
  • ✅ 结果确定性

2. 并行受限(Parallel Restricted)

定义:只能在领导进程中执行,但不妨碍查询使用并行计划的操作。

特点

  • ✅ 可以在领导进程执行
  • ❌ 不能在工作进程执行
  • ⚠️ 不会出现在 Gather 节点下方
  • ✅ 可以出现在包含 Gather 节点的计划中

3. 并行不安全(Parallel Unsafe)

定义:完全不能与并行查询一起使用的操作。

特点

  • ❌ 查询中存在任何并行不安全操作时,整个查询禁用并行
  • ❌ 即使在领导进程中也不能执行
  • ❌ 会导致整个查询计划退化为串行执行

内置的并行受限操作

PostgreSQL 中以下操作始终被标记为并行受限:

INFO

这些操作由系统自动识别,无需手动标记

1. CTE(公共表表达式)扫描

sql
-- 示例:CTE 扫描是并行受限的
WITH sales_summary AS (
    SELECT
        product_id,
        SUM(quantity) as total_quantity,
        AVG(price) as avg_price
    FROM sales
    GROUP BY product_id
)
SELECT * FROM sales_summary
WHERE total_quantity > 1000;

-- 执行计划分析
EXPLAIN (ANALYZE, BUFFERS)
WITH sales_summary AS (
    SELECT product_id, SUM(quantity) as total_quantity
    FROM sales
    GROUP BY product_id
)
SELECT * FROM sales_summary WHERE total_quantity > 1000;

输出示例

CTE Scan on sales_summary  (cost=... rows=... width=...)
  Filter: (total_quantity > 1000)
  CTE sales_summary
    ->  HashAggregate  (cost=... rows=... width=...)
          Group Key: sales.product_id
          ->  Seq Scan on sales  (cost=... rows=... width=...)

2. 临时表扫描

sql
-- 创建临时表
CREATE TEMP TABLE temp_high_value_customers AS
SELECT customer_id, SUM(amount) as total_spent
FROM orders
GROUP BY customer_id
HAVING SUM(amount) > 10000;

-- 临时表扫描是并行受限的
EXPLAIN (ANALYZE, VERBOSE)
SELECT t.*, c.customer_name
FROM temp_high_value_customers t
JOIN customers c ON t.customer_id = c.id;

3. 外部表扫描

sql
-- 创建外部表(以 postgres_fdw 为例)
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'remote_host', dbname 'remote_db');

CREATE FOREIGN TABLE foreign_orders (
    order_id INT,
    customer_id INT,
    order_date DATE,
    amount DECIMAL(10,2)
) SERVER foreign_server;

-- 外部表扫描默认是并行受限的
EXPLAIN (ANALYZE)
SELECT * FROM foreign_orders
WHERE order_date >= '2024-01-01';

4. 相关子查询(Correlated SubPlan)

sql
-- 包含相关子查询的示例
EXPLAIN (ANALYZE, VERBOSE)
SELECT
    c.customer_id,
    c.customer_name,
    (SELECT COUNT(*)
     FROM orders o
     WHERE o.customer_id = c.customer_id  -- 相关条件
     AND o.order_date >= '2024-01-01') as recent_orders
FROM customers c
WHERE c.status = 'ACTIVE';

函数和聚合的并行标记

创建并行安全函数

sql
-- 并行安全函数示例
CREATE OR REPLACE FUNCTION calculate_discount(price DECIMAL, discount_rate DECIMAL)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE  -- 重要:表示函数结果仅依赖于输入参数
PARALLEL SAFE  -- 标记为并行安全
AS $$
    SELECT price * (1 - discount_rate);
$$;

-- 使用示例
EXPLAIN (ANALYZE, VERBOSE)
SELECT
    product_id,
    price,
    calculate_discount(price, 0.1) as discounted_price
FROM products
WHERE category = 'Electronics';

创建并行受限函数

sql
-- 并行受限函数示例:使用了随机数
CREATE OR REPLACE FUNCTION random_sample(probability FLOAT)
RETURNS BOOLEAN
LANGUAGE SQL
PARALLEL RESTRICTED  -- 标记为并行受限
AS $$
    SELECT random() < probability;
$$;

-- 使用 setseed 的函数也是并行受限的
CREATE OR REPLACE FUNCTION deterministic_random(seed FLOAT)
RETURNS FLOAT
LANGUAGE plpgsql
PARALLEL RESTRICTED
AS $$
BEGIN
    PERFORM setseed(seed);
    RETURN random();
END;
$$;

创建并行不安全函数

sql
-- 并行不安全函数示例:写入数据库
CREATE OR REPLACE FUNCTION log_access(table_name TEXT)
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE  -- 标记为并行不安全
AS $$
BEGIN
    INSERT INTO access_log (table_name, access_time, user_name)
    VALUES (table_name, NOW(), current_user);
END;
$$;

-- 使用序列的函数
CREATE OR REPLACE FUNCTION get_next_id(sequence_name TEXT)
RETURNS BIGINT
LANGUAGE plpgsql
PARALLEL UNSAFE
AS $$
DECLARE
    next_val BIGINT;
BEGIN
    EXECUTE format('SELECT nextval(%L)', sequence_name) INTO next_val;
    RETURN next_val;
END;
$$;

并行安全性决策流程图

实际应用示例

示例 1:电商订单分析(并行安全)

sql
-- 创建并行安全的聚合函数
CREATE OR REPLACE FUNCTION calculate_order_score(
    amount DECIMAL,
    item_count INT,
    is_premium BOOLEAN
)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$
    SELECT
        CASE
            WHEN is_premium THEN amount * 1.2 + item_count * 10
            ELSE amount + item_count * 5
        END;
$$;

-- 使用并行查询分析订单
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 100;
SET parallel_tuple_cost = 0.01;

EXPLAIN (ANALYZE, VERBOSE, BUFFERS)
SELECT
    DATE_TRUNC('month', order_date) as month,
    COUNT(*) as order_count,
    SUM(amount) as total_revenue,
    AVG(calculate_order_score(amount, item_count, is_premium)) as avg_score
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;

预期输出

Finalize GroupAggregate  (cost=... rows=12 width=48)
  Group Key: (date_trunc('month', order_date))
  ->  Gather Merge  (cost=... rows=48 width=48)
        Workers Planned: 4
        Workers Launched: 4
        ->  Sort  (cost=... rows=12 width=48)
              Sort Key: (date_trunc('month', order_date))
              ->  Partial HashAggregate  (cost=... rows=12 width=48)
                    Group Key: date_trunc('month', order_date)
                    ->  Parallel Seq Scan on orders
                          Filter: (order_date >= '2024-01-01')

示例 2:审计日志系统(并行不安全)

sql
-- 创建审计表
CREATE TABLE audit_log (
    id SERIAL PRIMARY KEY,
    table_name TEXT,
    operation TEXT,
    user_name TEXT,
    timestamp TIMESTAMP DEFAULT NOW(),
    row_data JSONB
);

-- 创建审计函数(并行不安全)
CREATE OR REPLACE FUNCTION audit_data_access(
    p_table_name TEXT,
    p_operation TEXT,
    p_row_data JSONB
)
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE  -- 因为写入数据库
AS $$
BEGIN
    INSERT INTO audit_log (table_name, operation, user_name, row_data)
    VALUES (p_table_name, p_operation, current_user, p_row_data);
END;
$$;

-- 使用审计函数的查询将禁用并行
EXPLAIN (ANALYZE)
SELECT
    customer_id,
    customer_name,
    audit_data_access('customers', 'SELECT', row_to_json(c)::jsonb)
FROM customers c
WHERE status = 'ACTIVE';

示例 3:缓存策略实现(并行受限)

sql
-- 创建缓存表
CREATE TABLE query_cache (
    cache_key TEXT PRIMARY KEY,
    cache_value JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建缓存查询函数(并行受限)
CREATE OR REPLACE FUNCTION get_cached_result(p_cache_key TEXT)
RETURNS JSONB
LANGUAGE plpgsql
PARALLEL RESTRICTED  -- 访问了会话级别的状态
AS $$
DECLARE
    v_result JSONB;
BEGIN
    -- 检查应用级别的缓存设置
    IF current_setting('app.use_cache', true) = 'true' THEN
        SELECT cache_value INTO v_result
        FROM query_cache
        WHERE cache_key = p_cache_key
        AND created_at > NOW() - INTERVAL '1 hour';
    END IF;

    RETURN v_result;
END;
$$;

最佳实践建议

1. 函数标记原则

TIP

遵循"宁可保守"的原则:如有疑问,选择更严格的安全级别

sql
-- ✅ 好的实践:明确标记函数的并行安全性
CREATE FUNCTION safe_calculation(a INT, b INT)
RETURNS INT
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$ SELECT a + b; $$;

-- ❌ 避免:不标记并行安全性(默认为 UNSAFE)
CREATE FUNCTION unclear_function(a INT, b INT)
RETURNS INT
LANGUAGE SQL
AS $$ SELECT a + b; $$;

2. 性能优化策略

sql
-- 创建测试表
CREATE TABLE large_table (
    id SERIAL PRIMARY KEY,
    category TEXT,
    value DECIMAL,
    created_at TIMESTAMP
);

-- 插入测试数据
INSERT INTO large_table (category, value, created_at)
SELECT
    'Category_' || (random() * 10)::INT,
    random() * 1000,
    NOW() - (random() * 365)::INT * INTERVAL '1 day'
FROM generate_series(1, 10000000);

-- 创建索引
CREATE INDEX idx_large_table_category ON large_table(category);
CREATE INDEX idx_large_table_created_at ON large_table(created_at);

-- 优化并行查询参数
SET max_parallel_workers_per_gather = 4;
SET min_parallel_table_scan_size = '8MB';
SET min_parallel_index_scan_size = '512kB';

-- 比较串行和并行执行
-- 串行执行
SET max_parallel_workers_per_gather = 0;
EXPLAIN (ANALYZE, BUFFERS)
SELECT category, COUNT(*), AVG(value)
FROM large_table
WHERE created_at >= '2024-01-01'
GROUP BY category;

-- 并行执行
SET max_parallel_workers_per_gather = 4;
EXPLAIN (ANALYZE, BUFFERS)
SELECT category, COUNT(*), AVG(value)
FROM large_table
WHERE created_at >= '2024-01-01'
GROUP BY category;

3. 锁处理注意事项

WARNING

并行工作进程中获取的锁会在工作进程退出时释放,而不是事务结束时

sql
-- 需要标记为 RESTRICTED 的函数示例
CREATE OR REPLACE FUNCTION check_and_lock_inventory(p_product_id INT)
RETURNS BOOLEAN
LANGUAGE plpgsql
PARALLEL RESTRICTED  -- 重要:因为获取了额外的锁
AS $$
DECLARE
    v_available BOOLEAN;
BEGIN
    -- 获取行级锁
    PERFORM 1 FROM inventory
    WHERE product_id = p_product_id
    FOR UPDATE;

    SELECT quantity > 0 INTO v_available
    FROM inventory
    WHERE product_id = p_product_id;

    RETURN v_available;
END;
$$;

故障排除指南

常见错误及解决方案

错误 1:并行查询意外禁用

sql
-- 诊断查询
SELECT name, setting
FROM pg_settings
WHERE name LIKE '%parallel%'
ORDER BY name;

-- 检查函数的并行安全性
SELECT
    proname AS function_name,
    proparallel AS parallel_safety,
    CASE proparallel
        WHEN 's' THEN 'SAFE'
        WHEN 'r' THEN 'RESTRICTED'
        WHEN 'u' THEN 'UNSAFE'
    END AS safety_label
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
ORDER BY proname;

错误 2:函数标记错误导致的问题

sql
-- 创建测试函数(错误标记)
CREATE OR REPLACE FUNCTION bad_function()
RETURNS VOID
LANGUAGE plpgsql
PARALLEL SAFE  -- 错误:实际上是不安全的
AS $$
BEGIN
    -- 这会导致并行查询出错
    CREATE TEMP TABLE temp_result AS SELECT 1;
END;
$$;

-- 正确的版本
CREATE OR REPLACE FUNCTION corrected_function()
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE  -- 正确标记
AS $$
BEGIN
    CREATE TEMP TABLE temp_result AS SELECT 1;
END;
$$;

性能监控和调优

监控并行查询执行

sql
-- 创建监控视图
CREATE VIEW parallel_query_stats AS
SELECT
    queryid,
    query,
    calls,
    total_exec_time,
    mean_exec_time,
    max_exec_time,
    rows,
    100.0 * shared_blks_hit /
        NULLIF(shared_blks_hit + shared_blks_read, 0) AS hit_ratio
FROM pg_stat_statements
WHERE query LIKE '%Parallel%'
   OR query LIKE '%Gather%'
ORDER BY total_exec_time DESC;

-- 查看当前并行工作进程
SELECT
    pid,
    usename,
    application_name,
    state,
    query,
    backend_type
FROM pg_stat_activity
WHERE backend_type LIKE '%parallel%'
ORDER BY backend_start;

总结

理解和正确使用 PostgreSQL 的并行查询安全性分类是优化数据库性能的关键。记住以下要点:

  1. 默认保守:不确定时选择更严格的安全级别
  2. 充分测试:在生产环境部署前 thoroughly 测试函数的并行行为
  3. 监控性能:定期检查并行查询的执行效果
  4. 文档化:为自定义函数添加清晰的并行安全性说明
Details

扩展阅读