Skip to content

PostgreSQL 并行执行计划深度解析

概述

PostgreSQL 的并行执行计划是提升查询性能的关键技术。与传统的串行执行不同,并行计划通过将工作分配给多个进程来加速查询执行。本文将深入探讨 PostgreSQL 如何生成和执行并行计划,帮助您理解并优化复杂查询的性能。

并行计划的核心概念

部分计划(Partial Plan)

并行计划的核心是"部分计划"概念:

关键原则

每个工作进程执行部分计划,生成结果集的一个子集,确保:

  • 没有重复的输出行
  • 每个必需的输出行恰好由一个进程生成
  • 所有部分结果合并后得到完整结果

并行扫描类型

1. 并行顺序扫描(Parallel Sequential Scan)

工作原理:表的数据块被动态分配给各个工作进程。

sql
-- 创建测试表
CREATE TABLE large_orders (
    order_id BIGSERIAL PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    amount DECIMAL(10,2),
    status VARCHAR(20)
);

-- 插入大量测试数据
INSERT INTO large_orders (customer_id, order_date, amount, status)
SELECT 
    (random() * 10000)::INT,
    DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
    (random() * 10000)::DECIMAL(10,2),
    CASE (random() * 3)::INT 
        WHEN 0 THEN 'pending'
        WHEN 1 THEN 'completed'
        ELSE 'cancelled'
    END
FROM generate_series(1, 5000000);

-- 分析表以更新统计信息
ANALYZE large_orders;

-- 启用并行查询
SET max_parallel_workers_per_gather = 4;
SET parallel_tuple_cost = 0.01;
SET parallel_setup_cost = 100;

-- 执行并行顺序扫描
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT COUNT(*), AVG(amount), status
FROM large_orders
WHERE order_date >= '2023-01-01'
GROUP BY status;

预期执行计划

Finalize GroupAggregate  (cost=... rows=3 width=72)
  Output: count(*), avg(amount), status
  Group Key: status
  ->  Gather Merge  (cost=... rows=12 width=72)
        Output: status, (PARTIAL count(*)), (PARTIAL avg(amount))
        Workers Planned: 4
        Workers Launched: 4
        ->  Partial GroupAggregate  (cost=... rows=3 width=72)
              Output: status, PARTIAL count(*), PARTIAL avg(amount)
              Group Key: status
              ->  Sort  (cost=... rows=... width=28)
                    Output: status, amount
                    Sort Key: status
                    ->  Parallel Seq Scan on large_orders  (cost=... rows=... width=28)
                          Output: status, amount
                          Filter: (order_date >= '2023-01-01'::date)
                          Rows Removed by Filter: ...

2. 并行位图堆扫描(Parallel Bitmap Heap Scan)

工作原理:领导进程构建位图,工作进程并行扫描堆页面。

sql
-- 创建合适的索引
CREATE INDEX idx_orders_date ON large_orders(order_date);
CREATE INDEX idx_orders_customer ON large_orders(customer_id);

-- 设置参数以鼓励位图扫描
SET enable_seqscan = off;
SET work_mem = '256MB';

-- 执行并行位图堆扫描
EXPLAIN (ANALYZE, BUFFERS)
SELECT customer_id, COUNT(*), SUM(amount)
FROM large_orders
WHERE order_date BETWEEN '2023-01-01' AND '2023-03-31'
  AND customer_id BETWEEN 1000 AND 5000
GROUP BY customer_id;

3. 并行索引扫描(Parallel Index Scan)

工作原理:工作进程协作扫描 B-tree 索引。

sql
-- 创建覆盖索引
CREATE INDEX idx_orders_status_date_amount 
ON large_orders(status, order_date, amount);

-- 执行并行索引扫描
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT status, order_date, SUM(amount) as daily_total
FROM large_orders
WHERE status = 'completed'
  AND order_date >= '2023-01-01'
GROUP BY status, order_date
ORDER BY order_date;

索引扫描的并行化示意图

并行连接策略

1. 并行嵌套循环连接(Parallel Nested Loop Join)

sql
-- 创建关联表
CREATE TABLE customers (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    country VARCHAR(50),
    credit_limit DECIMAL(10,2)
);

-- 插入客户数据
INSERT INTO customers (customer_id, customer_name, country, credit_limit)
SELECT 
    i,
    'Customer_' || i,
    CASE (i % 5) 
        WHEN 0 THEN 'USA'
        WHEN 1 THEN 'UK'
        WHEN 2 THEN 'Germany'
        WHEN 3 THEN 'France'
        ELSE 'Japan'
    END,
    (random() * 100000)::DECIMAL(10,2)
FROM generate_series(1, 10000) i;

-- 创建索引
CREATE INDEX idx_customers_country ON customers(country);

-- 执行并行嵌套循环连接
EXPLAIN (ANALYZE, BUFFERS)
SELECT o.order_id, o.amount, c.customer_name, c.country
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2023-01-01'
  AND c.country = 'USA'
LIMIT 10000;

2. 并行哈希连接(Parallel Hash Join)

sql
-- 设置参数以鼓励并行哈希连接
SET work_mem = '512MB';
SET enable_nestloop = off;
SET enable_mergejoin = off;

-- 执行并行哈希连接
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT 
    c.country,
    COUNT(DISTINCT o.customer_id) as unique_customers,
    COUNT(*) as total_orders,
    SUM(o.amount) as total_revenue,
    AVG(o.amount) as avg_order_value
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2023-01-01'
GROUP BY c.country;

并行哈希连接执行流程

3. 连接策略对比

sql
-- 创建测试函数来比较不同连接策略
CREATE OR REPLACE FUNCTION compare_join_strategies()
RETURNS TABLE(
    strategy TEXT,
    execution_time INTERVAL,
    buffers_hit BIGINT,
    buffers_read BIGINT
) AS $$
DECLARE
    start_time TIMESTAMP;
    end_time TIMESTAMP;
    plan_buffers RECORD;
BEGIN
    -- 测试嵌套循环连接
    SET enable_hashjoin = off;
    SET enable_mergejoin = off;
    SET enable_nestloop = on;
    
    start_time := clock_timestamp();
    PERFORM COUNT(*)
    FROM large_orders o
    JOIN customers c ON o.customer_id = c.customer_id
    WHERE o.amount > 1000;
    end_time := clock_timestamp();
    
    RETURN QUERY
    SELECT 'Nested Loop'::TEXT, 
           end_time - start_time,
           0::BIGINT, 0::BIGINT;
    
    -- 测试哈希连接
    SET enable_hashjoin = on;
    SET enable_mergejoin = off;
    SET enable_nestloop = off;
    
    start_time := clock_timestamp();
    PERFORM COUNT(*)
    FROM large_orders o
    JOIN customers c ON o.customer_id = c.customer_id
    WHERE o.amount > 1000;
    end_time := clock_timestamp();
    
    RETURN QUERY
    SELECT 'Hash Join'::TEXT,
           end_time - start_time,
           0::BIGINT, 0::BIGINT;
           
    -- 重置设置
    RESET enable_hashjoin;
    RESET enable_mergejoin;
    RESET enable_nestloop;
END;
$$ LANGUAGE plpgsql;

并行聚合实现

两阶段聚合架构

实际应用示例

sql
-- 创建销售数据表
CREATE TABLE sales_data (
    sale_id BIGSERIAL PRIMARY KEY,
    product_id INT,
    store_id INT,
    sale_date DATE,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2)
);

-- 插入大量销售数据
INSERT INTO sales_data (product_id, store_id, sale_date, quantity, unit_price, total_amount)
SELECT 
    (random() * 1000)::INT,
    (random() * 100)::INT,
    DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
    (random() * 10 + 1)::INT,
    (random() * 100 + 10)::DECIMAL(10,2),
    0
FROM generate_series(1, 10000000);

-- 更新 total_amount
UPDATE sales_data SET total_amount = quantity * unit_price;

-- 创建索引
CREATE INDEX idx_sales_date ON sales_data(sale_date);
CREATE INDEX idx_sales_product ON sales_data(product_id);

-- 执行并行聚合查询
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT 
    DATE_TRUNC('month', sale_date) as month,
    product_id,
    COUNT(*) as transaction_count,
    SUM(quantity) as total_quantity,
    SUM(total_amount) as total_revenue,
    AVG(unit_price) as avg_price,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) as median_amount
FROM sales_data
WHERE sale_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', sale_date), product_id
HAVING SUM(total_amount) > 10000
ORDER BY month, total_revenue DESC;

并行聚合的限制

注意事项

以下情况不支持并行聚合:

  • 使用 DISTINCT 或 ORDER BY 的聚合函数
  • 有序集聚合(Ordered-Set Aggregates)
  • 使用 GROUPING SETS
  • 聚合函数不是并行安全的
sql
-- 示例:不支持并行的聚合查询
-- 1. 使用 DISTINCT
EXPLAIN (ANALYZE)
SELECT store_id, COUNT(DISTINCT product_id) as unique_products
FROM sales_data
GROUP BY store_id;

-- 2. 使用 GROUPING SETS
EXPLAIN (ANALYZE)
SELECT 
    store_id,
    product_id,
    SUM(total_amount) as revenue
FROM sales_data
WHERE sale_date >= '2023-01-01'
GROUP BY GROUPING SETS ((store_id), (product_id), (store_id, product_id));

-- 3. 自定义聚合函数需要正确标记
CREATE OR REPLACE FUNCTION custom_sum_state(state NUMERIC, value NUMERIC)
RETURNS NUMERIC AS $$
BEGIN
    RETURN COALESCE(state, 0) + COALESCE(value, 0);
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;

CREATE OR REPLACE FUNCTION custom_sum_combine(state1 NUMERIC, state2 NUMERIC)
RETURNS NUMERIC AS $$
BEGIN
    RETURN COALESCE(state1, 0) + COALESCE(state2, 0);
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;

CREATE AGGREGATE custom_sum(NUMERIC) (
    SFUNC = custom_sum_state,
    STYPE = NUMERIC,
    COMBINEFUNC = custom_sum_combine,
    PARALLEL = SAFE
);

并行追加(Parallel Append)

应用场景:分区表查询

sql
-- 创建分区表
CREATE TABLE sales_partitioned (
    sale_id BIGSERIAL,
    sale_date DATE,
    product_id INT,
    amount DECIMAL(10,2)
) PARTITION BY RANGE (sale_date);

-- 创建月度分区
CREATE TABLE sales_2023_01 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE sales_2023_02 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2023-02-01') TO ('2023-03-01');
CREATE TABLE sales_2023_03 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2023-03-01') TO ('2023-04-01');
CREATE TABLE sales_2023_04 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2023-04-01') TO ('2023-05-01');

-- 为每个分区创建索引
CREATE INDEX idx_sales_2023_01_product ON sales_2023_01(product_id);
CREATE INDEX idx_sales_2023_02_product ON sales_2023_02(product_id);
CREATE INDEX idx_sales_2023_03_product ON sales_2023_03(product_id);
CREATE INDEX idx_sales_2023_04_product ON sales_2023_04(product_id);

-- 插入测试数据
INSERT INTO sales_partitioned (sale_date, product_id, amount)
SELECT 
    DATE '2023-01-01' + (random() * 119)::INT * INTERVAL '1 day',
    (random() * 100)::INT,
    (random() * 1000)::DECIMAL(10,2)
FROM generate_series(1, 1000000);

-- 启用并行追加
SET enable_parallel_append = on;
SET max_parallel_workers_per_gather = 4;

-- 执行使用并行追加的查询
EXPLAIN (ANALYZE, BUFFERS)
SELECT product_id, SUM(amount) as total_amount
FROM sales_partitioned
WHERE sale_date BETWEEN '2023-01-15' AND '2023-03-15'
GROUP BY product_id
ORDER BY total_amount DESC
LIMIT 20;

Parallel Append vs Regular Append

性能优化实战

1. 并行计划参数调优

sql
-- 创建参数调优函数
CREATE OR REPLACE FUNCTION test_parallel_params(
    p_parallel_workers INT,
    p_setup_cost FLOAT,
    p_tuple_cost FLOAT
)
RETURNS TABLE(
    workers INT,
    setup_cost FLOAT,
    tuple_cost FLOAT,
    execution_time INTERVAL,
    parallel_used BOOLEAN
) AS $$
DECLARE
    start_time TIMESTAMP;
    end_time TIMESTAMP;
    plan_text TEXT;
BEGIN
    -- 设置参数
    EXECUTE format('SET max_parallel_workers_per_gather = %s', p_parallel_workers);
    EXECUTE format('SET parallel_setup_cost = %s', p_setup_cost);
    EXECUTE format('SET parallel_tuple_cost = %s', p_tuple_cost);
    
    -- 获取执行计划
    EXECUTE 'EXPLAIN (FORMAT JSON) SELECT COUNT(*) FROM large_orders WHERE amount > 100'
    INTO plan_text;
    
    -- 执行查询并计时
    start_time := clock_timestamp();
    PERFORM COUNT(*) FROM large_orders WHERE amount > 100;
    end_time := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        p_parallel_workers,
        p_setup_cost,
        p_tuple_cost,
        end_time - start_time,
        plan_text::JSON->0->'Plan'->>'Parallel Aware' = 'true';
        
    -- 重置参数
    RESET max_parallel_workers_per_gather;
    RESET parallel_setup_cost;
    RESET parallel_tuple_cost;
END;
$$ LANGUAGE plpgsql;

-- 测试不同参数组合
SELECT * FROM test_parallel_params(2, 1000, 0.1);
SELECT * FROM test_parallel_params(4, 100, 0.01);
SELECT * FROM test_parallel_params(8, 10, 0.001);

2. 监控并行执行

sql
-- 创建并行查询监控视图
CREATE VIEW parallel_query_monitor AS
WITH parallel_stats AS (
    SELECT 
        pid,
        usename,
        application_name,
        state,
        query,
        backend_type,
        backend_start,
        state_change,
        wait_event_type,
        wait_event
    FROM pg_stat_activity
    WHERE backend_type LIKE '%worker%'
       OR query LIKE '%Gather%'
       OR query LIKE '%Parallel%'
)
SELECT 
    pid,
    usename,
    backend_type,
    state,
    SUBSTRING(query, 1, 50) || '...' as query_preview,
    AGE(clock_timestamp(), backend_start) as connection_age,
    AGE(clock_timestamp(), state_change) as state_duration,
    wait_event_type,
    wait_event
FROM parallel_stats
ORDER BY backend_start DESC;

-- 查看当前并行查询状态
SELECT * FROM parallel_query_monitor;

3. 并行计划故障排除

sql
-- 诊断为什么查询没有使用并行计划
CREATE OR REPLACE FUNCTION diagnose_parallel_plan(p_query TEXT)
RETURNS TABLE(
    check_item TEXT,
    status TEXT,
    details TEXT
) AS $$
BEGIN
    -- 检查表大小
    RETURN QUERY
    SELECT 
        'Table Size Check'::TEXT,
        CASE 
            WHEN pg_relation_size('large_orders') > current_setting('min_parallel_table_scan_size')::BIGINT
            THEN 'PASS'
            ELSE 'FAIL'
        END,
        format('Table size: %s, Min required: %s', 
               pg_size_pretty(pg_relation_size('large_orders')),
               current_setting('min_parallel_table_scan_size'));
    
    -- 检查并行工作进程设置
    RETURN QUERY
    SELECT 
        'Worker Configuration'::TEXT,
        CASE 
            WHEN current_setting('max_parallel_workers_per_gather')::INT > 0
            THEN 'PASS'
            ELSE 'FAIL'
        END,
        format('max_parallel_workers_per_gather: %s', 
               current_setting('max_parallel_workers_per_gather'));
    
    -- 检查成本参数
    RETURN QUERY
    SELECT 
        'Cost Parameters'::TEXT,
        'INFO'::TEXT,
        format('parallel_setup_cost: %s, parallel_tuple_cost: %s',
               current_setting('parallel_setup_cost'),
               current_setting('parallel_tuple_cost'));
    
    -- 检查函数并行安全性
    RETURN QUERY
    WITH function_check AS (
        SELECT 
            proname,
            CASE proparallel
                WHEN 's' THEN 'SAFE'
                WHEN 'r' THEN 'RESTRICTED'
                WHEN 'u' THEN 'UNSAFE'
            END as safety
        FROM pg_proc
        WHERE pronamespace = 'public'::regnamespace
          AND proparallel != 's'
    )
    SELECT 
        'Function Parallel Safety'::TEXT,
        CASE 
            WHEN COUNT(*) = 0 THEN 'PASS'
            ELSE 'WARNING'
        END,
        CASE 
            WHEN COUNT(*) = 0 THEN 'All functions are parallel safe'
            ELSE format('%s functions are not parallel safe', COUNT(*))
        END
    FROM function_check;
END;
$$ LANGUAGE plpgsql;

-- 使用诊断函数
SELECT * FROM diagnose_parallel_plan('SELECT COUNT(*) FROM large_orders');

最佳实践建议

1. 工作负载均衡

TIP

确保工作在所有并行工作进程之间均匀分配

sql
-- 使用 EXPLAIN ANALYZE VERBOSE 查看每个工作进程的统计
EXPLAIN (ANALYZE, VERBOSE, BUFFERS)
SELECT 
    DATE_TRUNC('day', order_date) as day,
    COUNT(*) as orders,
    SUM(amount) as revenue
FROM large_orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('day', order_date);

-- 检查输出中每个 worker 的行数是否均衡

2. 内存配置优化

sql
-- 并行哈希连接的内存配置
-- work_mem 影响每个工作进程的内存使用
SET work_mem = '256MB';  -- 每个工作进程可使用的内存

-- 计算总内存使用
-- 总内存 ≈ work_mem × (max_parallel_workers_per_gather + 1)

3. 分区表的并行策略

sql
-- 创建优化的分区策略
CREATE TABLE orders_optimized (
    order_id BIGSERIAL,
    order_date DATE,
    customer_id INT,
    amount DECIMAL(10,2)
) PARTITION BY RANGE (order_date);

-- 创建合适大小的分区(避免太多小分区)
CREATE TABLE orders_2023_q1 PARTITION OF orders_optimized
    FOR VALUES FROM ('2023-01-01') TO ('2023-04-01');
CREATE TABLE orders_2023_q2 PARTITION OF orders_optimized
    FOR VALUES FROM ('2023-04-01') TO ('2023-07-01');

-- 确保每个分区都有适当的索引
CREATE INDEX idx_orders_2023_q1_customer ON orders_2023_q1(customer_id);
CREATE INDEX idx_orders_2023_q2_customer ON orders_2023_q2(customer_id);

总结

PostgreSQL 的并行执行计划通过以下机制提升查询性能:

  1. 多种并行扫描方式:适应不同的数据访问模式
  2. 灵活的并行连接策略:根据数据特征选择最优方案
  3. 两阶段并行聚合:有效减少数据传输开销
  4. 并行追加优化:提升分区表查询性能

关键要点:

  • 合理配置并行参数
  • 监控并行执行效果
  • 理解并行计划的限制
  • 根据工作负载特征优化
扩展阅读