Skip to content

PostgreSQL 并行查询使用条件完全指南

概述

PostgreSQL 的并行查询功能虽然强大,但并非在所有情况下都能使用。了解并行查询的使用条件和限制,对于优化数据库性能和排查问题至关重要。本文将详细介绍何时可以使用并行查询,以及各种限制条件和解决方案。

基本配置要求

必需的系统配置

关键参数配置

sql
-- 查看当前并行查询相关配置
SELECT name, setting, unit, short_desc
FROM pg_settings
WHERE name IN (
    'max_parallel_workers_per_gather',
    'max_parallel_workers',
    'max_worker_processes',
    'min_parallel_table_scan_size',
    'min_parallel_index_scan_size',
    'parallel_setup_cost',
    'parallel_tuple_cost'
)
ORDER BY name;

-- 设置并行查询参数
-- 每个 Gather 节点的最大并行工作进程数
ALTER SYSTEM SET max_parallel_workers_per_gather = 4;

-- 系统总的并行工作进程数限制
ALTER SYSTEM SET max_parallel_workers = 8;

-- 后台工作进程总数限制(包括并行查询和其他后台任务)
ALTER SYSTEM SET max_worker_processes = 16;

-- 重新加载配置
SELECT pg_reload_conf();

参数层级关系

并行查询的限制条件

1. 数据修改操作限制

WARNING

写操作和锁定操作会阻止并行查询计划的生成

sql
-- 创建测试表
CREATE TABLE test_parallel (
    id SERIAL PRIMARY KEY,
    category VARCHAR(50),
    value DECIMAL(10,2),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 插入测试数据
INSERT INTO test_parallel (category, value)
SELECT 
    'Category_' || (random() * 10)::INT,
    (random() * 1000)::DECIMAL(10,2)
FROM generate_series(1, 1000000);

-- ❌ 不会使用并行:UPDATE 操作
EXPLAIN (ANALYZE, BUFFERS)
UPDATE test_parallel 
SET value = value * 1.1
WHERE category = 'Category_5';

-- ❌ 不会使用并行:DELETE 操作
EXPLAIN (ANALYZE)
DELETE FROM test_parallel
WHERE created_at < NOW() - INTERVAL '30 days';

-- ❌ 不会使用并行:SELECT FOR UPDATE
EXPLAIN (ANALYZE)
SELECT * FROM test_parallel
WHERE value > 500
FOR UPDATE;

-- ✅ 可以使用并行:只读查询
EXPLAIN (ANALYZE)
SELECT category, COUNT(*), AVG(value)
FROM test_parallel
GROUP BY category;

2. 特殊情况下的并行查询

某些创建表的操作可以在底层 SELECT 部分使用并行:

sql
-- ✅ CREATE TABLE AS 可以使用并行
EXPLAIN (ANALYZE, VERBOSE)
CREATE TABLE parallel_summary AS
SELECT 
    category,
    COUNT(*) as count,
    SUM(value) as total_value,
    AVG(value) as avg_value
FROM test_parallel
GROUP BY category;

-- ✅ SELECT INTO 可以使用并行
EXPLAIN (ANALYZE)
SELECT 
    DATE_TRUNC('month', created_at) as month,
    COUNT(*) as record_count
INTO monthly_summary
FROM test_parallel
GROUP BY DATE_TRUNC('month', created_at);

-- ✅ CREATE MATERIALIZED VIEW 可以使用并行
EXPLAIN (ANALYZE)
CREATE MATERIALIZED VIEW mv_category_stats AS
SELECT 
    category,
    COUNT(*) as total_records,
    MIN(value) as min_value,
    MAX(value) as max_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as median_value
FROM test_parallel
GROUP BY category;

-- ✅ REFRESH MATERIALIZED VIEW 可以使用并行
EXPLAIN (ANALYZE)
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_category_stats;

3. 游标和循环限制

sql
-- ❌ 游标不支持并行查询
BEGIN;
DECLARE test_cursor CURSOR FOR
    SELECT category, COUNT(*)
    FROM test_parallel
    GROUP BY category;
    
-- 查看执行计划(不会显示并行)
EXPLAIN (ANALYZE)
FETCH ALL FROM test_cursor;

CLOSE test_cursor;
COMMIT;

-- ❌ PL/pgSQL 循环中的查询不支持并行
CREATE OR REPLACE FUNCTION process_categories()
RETURNS TABLE(category VARCHAR, total BIGINT) AS $$
BEGIN
    -- 这个查询不会使用并行计划
    FOR category, total IN
        SELECT t.category, COUNT(*) 
        FROM test_parallel t
        GROUP BY t.category
    LOOP
        -- 处理逻辑
        RETURN NEXT;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 替代方案:使用返回结果集的函数
CREATE OR REPLACE FUNCTION get_category_stats()
RETURNS TABLE(category VARCHAR, total BIGINT) AS $$
BEGIN
    -- 这个查询可以使用并行计划
    RETURN QUERY
    SELECT t.category, COUNT(*)::BIGINT
    FROM test_parallel t
    GROUP BY t.category;
END;
$$ LANGUAGE plpgsql;

4. 函数并行安全性限制

sql
-- 查看函数的并行安全性
SELECT 
    proname AS function_name,
    prosrc AS source_code,
    CASE proparallel
        WHEN 's' THEN 'PARALLEL SAFE'
        WHEN 'r' THEN 'PARALLEL RESTRICTED'
        WHEN 'u' THEN 'PARALLEL UNSAFE'
    END AS parallel_safety
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
ORDER BY proname;

-- ❌ 使用 PARALLEL UNSAFE 函数会阻止并行查询
CREATE OR REPLACE FUNCTION unsafe_function(val DECIMAL)
RETURNS DECIMAL AS $$
BEGIN
    -- 模拟不安全操作
    RAISE NOTICE 'Processing value: %', val;
    RETURN val * 2;
END;
$$ LANGUAGE plpgsql PARALLEL UNSAFE;

-- 这个查询不会使用并行
EXPLAIN (ANALYZE)
SELECT category, unsafe_function(AVG(value))
FROM test_parallel
GROUP BY category;

-- ✅ 修改为 PARALLEL SAFE
CREATE OR REPLACE FUNCTION safe_function(val DECIMAL)
RETURNS DECIMAL AS $$
    SELECT val * 2;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;

-- 现在可以使用并行
EXPLAIN (ANALYZE)
SELECT category, safe_function(AVG(value))
FROM test_parallel
GROUP BY category;

5. 嵌套并行查询限制

sql
-- 创建一个调用查询的函数
CREATE OR REPLACE FUNCTION get_category_count(p_category VARCHAR)
RETURNS BIGINT AS $$
DECLARE
    v_count BIGINT;
BEGIN
    -- 如果这个函数被并行查询调用,内部查询不会并行
    SELECT COUNT(*) INTO v_count
    FROM test_parallel
    WHERE category = p_category;
    
    RETURN v_count;
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;

-- 外层查询可能并行,但函数内的查询不会
EXPLAIN (ANALYZE, VERBOSE)
SELECT DISTINCT 
    category,
    get_category_count(category) as count
FROM test_parallel
LIMIT 10;

运行时并行执行限制

资源限制检查

sql
-- 创建资源监控函数
CREATE OR REPLACE FUNCTION check_parallel_resources()
RETURNS TABLE(
    resource_type TEXT,
    current_value INT,
    max_value INT,
    available INT,
    status TEXT
) AS $$
BEGIN
    -- 检查总工作进程
    RETURN QUERY
    WITH worker_stats AS (
        SELECT 
            COUNT(*) FILTER (WHERE backend_type = 'parallel worker') as parallel_workers,
            COUNT(*) FILTER (WHERE backend_type LIKE '%worker%') as all_workers
        FROM pg_stat_activity
    )
    SELECT 
        'Total Worker Processes'::TEXT,
        all_workers::INT,
        current_setting('max_worker_processes')::INT,
        current_setting('max_worker_processes')::INT - all_workers::INT,
        CASE 
            WHEN all_workers < current_setting('max_worker_processes')::INT * 0.8 
            THEN 'OK'
            ELSE 'WARNING'
        END
    FROM worker_stats;
    
    -- 检查并行工作进程
    RETURN QUERY
    WITH parallel_stats AS (
        SELECT COUNT(*) as count
        FROM pg_stat_activity
        WHERE backend_type = 'parallel worker'
    )
    SELECT 
        'Parallel Workers'::TEXT,
        count::INT,
        current_setting('max_parallel_workers')::INT,
        current_setting('max_parallel_workers')::INT - count::INT,
        CASE 
            WHEN count < current_setting('max_parallel_workers')::INT * 0.8 
            THEN 'OK'
            ELSE 'WARNING'
        END
    FROM parallel_stats;
END;
$$ LANGUAGE plpgsql;

-- 使用监控函数
SELECT * FROM check_parallel_resources();

客户端协议限制

sql
-- 创建测试函数来模拟不同的执行场景
CREATE OR REPLACE FUNCTION test_fetch_count_impact()
RETURNS VOID AS $$
DECLARE
    v_rec RECORD;
BEGIN
    -- 标准执行(可以并行)
    FOR v_rec IN
        SELECT category, COUNT(*)
        FROM test_parallel
        GROUP BY category
    LOOP
        -- 处理每条记录
    END LOOP;
    
    -- 使用游标的小批量获取(不能并行)
    DECLARE cur CURSOR FOR
        SELECT category, COUNT(*)
        FROM test_parallel
        GROUP BY category;
    BEGIN
        LOOP
            FETCH 10 FROM cur INTO v_rec;
            EXIT WHEN NOT FOUND;
            -- 处理记录
        END LOOP;
    END;
END;
$$ LANGUAGE plpgsql;

实际应用案例

案例1:电商数据分析优化

sql
-- 创建电商订单表
CREATE TABLE orders (
    order_id BIGSERIAL PRIMARY KEY,
    customer_id INT,
    product_id INT,
    order_date DATE,
    quantity INT,
    unit_price DECIMAL(10,2),
    status VARCHAR(20)
);

-- 创建产品表
CREATE TABLE products (
    product_id INT PRIMARY KEY,
    product_name VARCHAR(100),
    category VARCHAR(50),
    supplier_id INT
);

-- 插入测试数据
INSERT INTO products (product_id, product_name, category, supplier_id)
SELECT 
    i,
    'Product_' || i,
    'Category_' || (i % 20),
    (i % 100)
FROM generate_series(1, 1000) i;

INSERT INTO orders (customer_id, product_id, order_date, quantity, unit_price, status)
SELECT 
    (random() * 10000)::INT,
    (random() * 1000 + 1)::INT,
    DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
    (random() * 10 + 1)::INT,
    (random() * 100 + 10)::DECIMAL(10,2),
    CASE (random() * 3)::INT
        WHEN 0 THEN 'pending'
        WHEN 1 THEN 'shipped'
        ELSE 'delivered'
    END
FROM generate_series(1, 5000000);

-- 分析表
ANALYZE orders;
ANALYZE products;

-- 创建分析函数(正确的并行安全标记)
CREATE OR REPLACE FUNCTION calculate_revenue(
    p_quantity INT,
    p_unit_price DECIMAL,
    p_discount DECIMAL DEFAULT 0
)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$
    SELECT p_quantity * p_unit_price * (1 - p_discount);
$$;

-- ✅ 可以使用并行的复杂分析查询
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT 
    p.category,
    DATE_TRUNC('quarter', o.order_date) as quarter,
    COUNT(DISTINCT o.customer_id) as unique_customers,
    COUNT(*) as total_orders,
    SUM(calculate_revenue(o.quantity, o.unit_price)) as total_revenue,
    AVG(calculate_revenue(o.quantity, o.unit_price)) as avg_order_value
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2023-01-01'
  AND o.status = 'delivered'
GROUP BY p.category, DATE_TRUNC('quarter', o.order_date)
HAVING COUNT(*) > 100
ORDER BY p.category, quarter;

案例2:实时报表生成优化

sql
-- 创建实时报表生成过程
CREATE OR REPLACE PROCEDURE generate_daily_report(p_date DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    v_start_time TIMESTAMP;
    v_end_time TIMESTAMP;
BEGIN
    v_start_time := clock_timestamp();
    
    -- 删除旧报表数据
    DELETE FROM daily_reports WHERE report_date = p_date;
    
    -- 生成新报表(使用 INSERT SELECT 可以并行)
    INSERT INTO daily_reports (
        report_date,
        category,
        total_orders,
        total_revenue,
        avg_order_value,
        top_product
    )
    SELECT 
        p_date,
        p.category,
        COUNT(*) as total_orders,
        SUM(o.quantity * o.unit_price) as total_revenue,
        AVG(o.quantity * o.unit_price) as avg_order_value,
        (
            SELECT product_name
            FROM products p2
            JOIN orders o2 ON p2.product_id = o2.product_id
            WHERE p2.category = p.category
              AND o2.order_date = p_date
            GROUP BY p2.product_name
            ORDER BY SUM(o2.quantity * o2.unit_price) DESC
            LIMIT 1
        ) as top_product
    FROM orders o
    JOIN products p ON o.product_id = p.product_id
    WHERE o.order_date = p_date
    GROUP BY p.category;
    
    v_end_time := clock_timestamp();
    
    -- 记录执行时间
    INSERT INTO report_generation_log (report_date, execution_time)
    VALUES (p_date, v_end_time - v_start_time);
    
    COMMIT;
END;
$$;

故障排除指南

诊断工具集

sql
-- 综合诊断函数
CREATE OR REPLACE FUNCTION diagnose_parallel_query(p_query TEXT)
RETURNS TABLE(
    check_category TEXT,
    check_item TEXT,
    result TEXT,
    recommendation TEXT
) AS $$
BEGIN
    -- 1. 配置检查
    RETURN QUERY
    SELECT 
        'Configuration'::TEXT,
        'max_parallel_workers_per_gather'::TEXT,
        current_setting('max_parallel_workers_per_gather'),
        CASE 
            WHEN current_setting('max_parallel_workers_per_gather')::INT = 0
            THEN 'Set to value > 0 to enable parallel queries'
            ELSE 'OK'
        END;
    
    -- 2. 资源可用性
    RETURN QUERY
    WITH worker_count AS (
        SELECT COUNT(*) as active_workers
        FROM pg_stat_activity
        WHERE backend_type = 'parallel worker'
    )
    SELECT 
        'Resources'::TEXT,
        'Available parallel workers'::TEXT,
        format('%s of %s in use', 
               active_workers, 
               current_setting('max_parallel_workers')),
        CASE 
            WHEN active_workers::NUMERIC / current_setting('max_parallel_workers')::NUMERIC > 0.8
            THEN 'Consider increasing max_parallel_workers'
            ELSE 'Sufficient workers available'
        END
    FROM worker_count;
    
    -- 3. 表大小检查
    RETURN QUERY
    SELECT 
        'Table Size'::TEXT,
        c.relname::TEXT,
        pg_size_pretty(pg_relation_size(c.oid)),
        CASE 
            WHEN pg_relation_size(c.oid) < current_setting('min_parallel_table_scan_size')::BIGINT
            THEN format('Table too small for parallel scan (min: %s)', 
                       current_setting('min_parallel_table_scan_size'))
            ELSE 'Table size sufficient for parallel scan'
        END
    FROM pg_class c
    WHERE c.relname IN (
        SELECT DISTINCT tablename 
        FROM pg_tables 
        WHERE schemaname = 'public'
    )
    AND c.relkind = 'r';
    
    -- 4. 函数安全性检查
    RETURN QUERY
    WITH unsafe_functions AS (
        SELECT proname
        FROM pg_proc
        WHERE pronamespace = 'public'::regnamespace
          AND proparallel = 'u'
    )
    SELECT 
        'Functions'::TEXT,
        'Unsafe functions'::TEXT,
        COALESCE(string_agg(proname, ', '), 'None'),
        CASE 
            WHEN COUNT(*) > 0
            THEN 'Mark functions as PARALLEL SAFE/RESTRICTED where appropriate'
            ELSE 'All functions are parallel-compatible'
        END
    FROM unsafe_functions;
END;
$$ LANGUAGE plpgsql;

-- 使用诊断工具
SELECT * FROM diagnose_parallel_query('SELECT * FROM orders');

性能对比测试

sql
-- 创建性能测试框架
CREATE OR REPLACE FUNCTION compare_parallel_performance(
    p_query TEXT,
    p_parallel_workers INT DEFAULT 4
)
RETURNS TABLE(
    execution_mode TEXT,
    execution_time INTERVAL,
    buffers_hit BIGINT,
    buffers_read BIGINT
) AS $$
DECLARE
    v_start TIMESTAMP;
    v_end TIMESTAMP;
    v_buffers RECORD;
BEGIN
    -- 测试串行执行
    SET max_parallel_workers_per_gather = 0;
    
    v_start := clock_timestamp();
    EXECUTE p_query;
    v_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'Serial'::TEXT,
        v_end - v_start,
        0::BIGINT,
        0::BIGINT;
    
    -- 测试并行执行
    EXECUTE format('SET max_parallel_workers_per_gather = %s', p_parallel_workers);
    
    v_start := clock_timestamp();
    EXECUTE p_query;
    v_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        format('Parallel (%s workers)', p_parallel_workers)::TEXT,
        v_end - v_start,
        0::BIGINT,
        0::BIGINT;
    
    -- 重置设置
    RESET max_parallel_workers_per_gather;
END;
$$ LANGUAGE plpgsql;

最佳实践建议

1. 查询设计原则

2. 配置优化建议

sql
-- 根据系统资源设置并行参数
CREATE OR REPLACE FUNCTION recommend_parallel_settings()
RETURNS TABLE(
    parameter TEXT,
    current_value TEXT,
    recommended_value TEXT,
    reason TEXT
) AS $$
DECLARE
    v_cpu_count INT;
    v_total_mem BIGINT;
BEGIN
    -- 获取 CPU 核心数(简化示例)
    v_cpu_count := 8; -- 实际应从系统获取
    
    RETURN QUERY
    SELECT 
        'max_worker_processes'::TEXT,
        current_setting('max_worker_processes'),
        (v_cpu_count * 2)::TEXT,
        'Set to 2x CPU cores for optimal performance';
    
    RETURN QUERY
    SELECT 
        'max_parallel_workers'::TEXT,
        current_setting('max_parallel_workers'),
        v_cpu_count::TEXT,
        'Set to number of CPU cores';
    
    RETURN QUERY
    SELECT 
        'max_parallel_workers_per_gather'::TEXT,
        current_setting('max_parallel_workers_per_gather'),
        GREATEST(2, v_cpu_count / 4)::TEXT,
        'Set to CPU cores / 4, minimum 2';
END;
$$ LANGUAGE plpgsql;

3. 监控和调优

sql
-- 创建并行查询性能监控视图
CREATE VIEW parallel_query_performance AS
WITH query_stats AS (
    SELECT 
        queryid,
        query,
        calls,
        total_exec_time,
        mean_exec_time,
        stddev_exec_time,
        rows
    FROM pg_stat_statements
    WHERE query LIKE '%Parallel%'
       OR query LIKE '%Gather%'
)
SELECT 
    queryid,
    LEFT(query, 50) || '...' as query_preview,
    calls,
    round(total_exec_time::numeric, 2) as total_time_ms,
    round(mean_exec_time::numeric, 2) as avg_time_ms,
    round(stddev_exec_time::numeric, 2) as stddev_time_ms,
    rows,
    round(rows::numeric / NULLIF(calls, 0), 2) as avg_rows
FROM query_stats
ORDER BY total_exec_time DESC
LIMIT 20;

总结

掌握 PostgreSQL 并行查询的使用条件是优化数据库性能的关键:

  1. 配置正确:确保系统参数允许并行查询
  2. 了解限制:避免在不支持的场景中期望并行执行
  3. 合理设计:编写并行友好的查询和函数
  4. 持续监控:跟踪并行查询的实际效果
  5. 灵活调整:根据工作负载特征优化配置

记住

并行查询不是万能的,合理使用才能发挥最大效果

扩展阅读