Appearance
PostgreSQL 并行执行计划深度解析
概述
PostgreSQL 的并行执行计划是提升查询性能的关键技术。与传统的串行执行不同,并行计划通过将工作分配给多个进程来加速查询执行。本文将深入探讨 PostgreSQL 如何生成和执行并行计划,帮助您理解并优化复杂查询的性能。
并行计划的核心概念
部分计划(Partial Plan)
并行计划的核心是"部分计划"概念:
关键原则
每个工作进程执行部分计划,生成结果集的一个子集,确保:
- 没有重复的输出行
- 每个必需的输出行恰好由一个进程生成
- 所有部分结果合并后得到完整结果
并行扫描类型
1. 并行顺序扫描(Parallel Sequential Scan)
工作原理:表的数据块被动态分配给各个工作进程。
sql
-- 创建测试表
CREATE TABLE large_orders (
order_id BIGSERIAL PRIMARY KEY,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2),
status VARCHAR(20)
);
-- 插入大量测试数据
INSERT INTO large_orders (customer_id, order_date, amount, status)
SELECT
(random() * 10000)::INT,
DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
(random() * 10000)::DECIMAL(10,2),
CASE (random() * 3)::INT
WHEN 0 THEN 'pending'
WHEN 1 THEN 'completed'
ELSE 'cancelled'
END
FROM generate_series(1, 5000000);
-- 分析表以更新统计信息
ANALYZE large_orders;
-- 启用并行查询
SET max_parallel_workers_per_gather = 4;
SET parallel_tuple_cost = 0.01;
SET parallel_setup_cost = 100;
-- 执行并行顺序扫描
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT COUNT(*), AVG(amount), status
FROM large_orders
WHERE order_date >= '2023-01-01'
GROUP BY status;
预期执行计划:
Finalize GroupAggregate (cost=... rows=3 width=72)
Output: count(*), avg(amount), status
Group Key: status
-> Gather Merge (cost=... rows=12 width=72)
Output: status, (PARTIAL count(*)), (PARTIAL avg(amount))
Workers Planned: 4
Workers Launched: 4
-> Partial GroupAggregate (cost=... rows=3 width=72)
Output: status, PARTIAL count(*), PARTIAL avg(amount)
Group Key: status
-> Sort (cost=... rows=... width=28)
Output: status, amount
Sort Key: status
-> Parallel Seq Scan on large_orders (cost=... rows=... width=28)
Output: status, amount
Filter: (order_date >= '2023-01-01'::date)
Rows Removed by Filter: ...
2. 并行位图堆扫描(Parallel Bitmap Heap Scan)
工作原理:领导进程构建位图,工作进程并行扫描堆页面。
sql
-- 创建合适的索引
CREATE INDEX idx_orders_date ON large_orders(order_date);
CREATE INDEX idx_orders_customer ON large_orders(customer_id);
-- 设置参数以鼓励位图扫描
SET enable_seqscan = off;
SET work_mem = '256MB';
-- 执行并行位图堆扫描
EXPLAIN (ANALYZE, BUFFERS)
SELECT customer_id, COUNT(*), SUM(amount)
FROM large_orders
WHERE order_date BETWEEN '2023-01-01' AND '2023-03-31'
AND customer_id BETWEEN 1000 AND 5000
GROUP BY customer_id;
3. 并行索引扫描(Parallel Index Scan)
工作原理:工作进程协作扫描 B-tree 索引。
sql
-- 创建覆盖索引
CREATE INDEX idx_orders_status_date_amount
ON large_orders(status, order_date, amount);
-- 执行并行索引扫描
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT status, order_date, SUM(amount) as daily_total
FROM large_orders
WHERE status = 'completed'
AND order_date >= '2023-01-01'
GROUP BY status, order_date
ORDER BY order_date;
索引扫描的并行化示意图:
并行连接策略
1. 并行嵌套循环连接(Parallel Nested Loop Join)
sql
-- 创建关联表
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
customer_name VARCHAR(100),
country VARCHAR(50),
credit_limit DECIMAL(10,2)
);
-- 插入客户数据
INSERT INTO customers (customer_id, customer_name, country, credit_limit)
SELECT
i,
'Customer_' || i,
CASE (i % 5)
WHEN 0 THEN 'USA'
WHEN 1 THEN 'UK'
WHEN 2 THEN 'Germany'
WHEN 3 THEN 'France'
ELSE 'Japan'
END,
(random() * 100000)::DECIMAL(10,2)
FROM generate_series(1, 10000) i;
-- 创建索引
CREATE INDEX idx_customers_country ON customers(country);
-- 执行并行嵌套循环连接
EXPLAIN (ANALYZE, BUFFERS)
SELECT o.order_id, o.amount, c.customer_name, c.country
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2023-01-01'
AND c.country = 'USA'
LIMIT 10000;
2. 并行哈希连接(Parallel Hash Join)
sql
-- 设置参数以鼓励并行哈希连接
SET work_mem = '512MB';
SET enable_nestloop = off;
SET enable_mergejoin = off;
-- 执行并行哈希连接
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT
c.country,
COUNT(DISTINCT o.customer_id) as unique_customers,
COUNT(*) as total_orders,
SUM(o.amount) as total_revenue,
AVG(o.amount) as avg_order_value
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2023-01-01'
GROUP BY c.country;
并行哈希连接执行流程:
3. 连接策略对比
sql
-- 创建测试函数来比较不同连接策略
CREATE OR REPLACE FUNCTION compare_join_strategies()
RETURNS TABLE(
strategy TEXT,
execution_time INTERVAL,
buffers_hit BIGINT,
buffers_read BIGINT
) AS $$
DECLARE
start_time TIMESTAMP;
end_time TIMESTAMP;
plan_buffers RECORD;
BEGIN
-- 测试嵌套循环连接
SET enable_hashjoin = off;
SET enable_mergejoin = off;
SET enable_nestloop = on;
start_time := clock_timestamp();
PERFORM COUNT(*)
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.amount > 1000;
end_time := clock_timestamp();
RETURN QUERY
SELECT 'Nested Loop'::TEXT,
end_time - start_time,
0::BIGINT, 0::BIGINT;
-- 测试哈希连接
SET enable_hashjoin = on;
SET enable_mergejoin = off;
SET enable_nestloop = off;
start_time := clock_timestamp();
PERFORM COUNT(*)
FROM large_orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.amount > 1000;
end_time := clock_timestamp();
RETURN QUERY
SELECT 'Hash Join'::TEXT,
end_time - start_time,
0::BIGINT, 0::BIGINT;
-- 重置设置
RESET enable_hashjoin;
RESET enable_mergejoin;
RESET enable_nestloop;
END;
$$ LANGUAGE plpgsql;
并行聚合实现
两阶段聚合架构
实际应用示例
sql
-- 创建销售数据表
CREATE TABLE sales_data (
sale_id BIGSERIAL PRIMARY KEY,
product_id INT,
store_id INT,
sale_date DATE,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2)
);
-- 插入大量销售数据
INSERT INTO sales_data (product_id, store_id, sale_date, quantity, unit_price, total_amount)
SELECT
(random() * 1000)::INT,
(random() * 100)::INT,
DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
(random() * 10 + 1)::INT,
(random() * 100 + 10)::DECIMAL(10,2),
0
FROM generate_series(1, 10000000);
-- 更新 total_amount
UPDATE sales_data SET total_amount = quantity * unit_price;
-- 创建索引
CREATE INDEX idx_sales_date ON sales_data(sale_date);
CREATE INDEX idx_sales_product ON sales_data(product_id);
-- 执行并行聚合查询
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT
DATE_TRUNC('month', sale_date) as month,
product_id,
COUNT(*) as transaction_count,
SUM(quantity) as total_quantity,
SUM(total_amount) as total_revenue,
AVG(unit_price) as avg_price,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) as median_amount
FROM sales_data
WHERE sale_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', sale_date), product_id
HAVING SUM(total_amount) > 10000
ORDER BY month, total_revenue DESC;
并行聚合的限制
注意事项
以下情况不支持并行聚合:
- 使用 DISTINCT 或 ORDER BY 的聚合函数
- 有序集聚合(Ordered-Set Aggregates)
- 使用 GROUPING SETS
- 聚合函数不是并行安全的
sql
-- 示例:不支持并行的聚合查询
-- 1. 使用 DISTINCT
EXPLAIN (ANALYZE)
SELECT store_id, COUNT(DISTINCT product_id) as unique_products
FROM sales_data
GROUP BY store_id;
-- 2. 使用 GROUPING SETS
EXPLAIN (ANALYZE)
SELECT
store_id,
product_id,
SUM(total_amount) as revenue
FROM sales_data
WHERE sale_date >= '2023-01-01'
GROUP BY GROUPING SETS ((store_id), (product_id), (store_id, product_id));
-- 3. 自定义聚合函数需要正确标记
CREATE OR REPLACE FUNCTION custom_sum_state(state NUMERIC, value NUMERIC)
RETURNS NUMERIC AS $$
BEGIN
RETURN COALESCE(state, 0) + COALESCE(value, 0);
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;
CREATE OR REPLACE FUNCTION custom_sum_combine(state1 NUMERIC, state2 NUMERIC)
RETURNS NUMERIC AS $$
BEGIN
RETURN COALESCE(state1, 0) + COALESCE(state2, 0);
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;
CREATE AGGREGATE custom_sum(NUMERIC) (
SFUNC = custom_sum_state,
STYPE = NUMERIC,
COMBINEFUNC = custom_sum_combine,
PARALLEL = SAFE
);
并行追加(Parallel Append)
应用场景:分区表查询
sql
-- 创建分区表
CREATE TABLE sales_partitioned (
sale_id BIGSERIAL,
sale_date DATE,
product_id INT,
amount DECIMAL(10,2)
) PARTITION BY RANGE (sale_date);
-- 创建月度分区
CREATE TABLE sales_2023_01 PARTITION OF sales_partitioned
FOR VALUES FROM ('2023-01-01') TO ('2023-02-01');
CREATE TABLE sales_2023_02 PARTITION OF sales_partitioned
FOR VALUES FROM ('2023-02-01') TO ('2023-03-01');
CREATE TABLE sales_2023_03 PARTITION OF sales_partitioned
FOR VALUES FROM ('2023-03-01') TO ('2023-04-01');
CREATE TABLE sales_2023_04 PARTITION OF sales_partitioned
FOR VALUES FROM ('2023-04-01') TO ('2023-05-01');
-- 为每个分区创建索引
CREATE INDEX idx_sales_2023_01_product ON sales_2023_01(product_id);
CREATE INDEX idx_sales_2023_02_product ON sales_2023_02(product_id);
CREATE INDEX idx_sales_2023_03_product ON sales_2023_03(product_id);
CREATE INDEX idx_sales_2023_04_product ON sales_2023_04(product_id);
-- 插入测试数据
INSERT INTO sales_partitioned (sale_date, product_id, amount)
SELECT
DATE '2023-01-01' + (random() * 119)::INT * INTERVAL '1 day',
(random() * 100)::INT,
(random() * 1000)::DECIMAL(10,2)
FROM generate_series(1, 1000000);
-- 启用并行追加
SET enable_parallel_append = on;
SET max_parallel_workers_per_gather = 4;
-- 执行使用并行追加的查询
EXPLAIN (ANALYZE, BUFFERS)
SELECT product_id, SUM(amount) as total_amount
FROM sales_partitioned
WHERE sale_date BETWEEN '2023-01-15' AND '2023-03-15'
GROUP BY product_id
ORDER BY total_amount DESC
LIMIT 20;
Parallel Append vs Regular Append
性能优化实战
1. 并行计划参数调优
sql
-- 创建参数调优函数
CREATE OR REPLACE FUNCTION test_parallel_params(
p_parallel_workers INT,
p_setup_cost FLOAT,
p_tuple_cost FLOAT
)
RETURNS TABLE(
workers INT,
setup_cost FLOAT,
tuple_cost FLOAT,
execution_time INTERVAL,
parallel_used BOOLEAN
) AS $$
DECLARE
start_time TIMESTAMP;
end_time TIMESTAMP;
plan_text TEXT;
BEGIN
-- 设置参数
EXECUTE format('SET max_parallel_workers_per_gather = %s', p_parallel_workers);
EXECUTE format('SET parallel_setup_cost = %s', p_setup_cost);
EXECUTE format('SET parallel_tuple_cost = %s', p_tuple_cost);
-- 获取执行计划
EXECUTE 'EXPLAIN (FORMAT JSON) SELECT COUNT(*) FROM large_orders WHERE amount > 100'
INTO plan_text;
-- 执行查询并计时
start_time := clock_timestamp();
PERFORM COUNT(*) FROM large_orders WHERE amount > 100;
end_time := clock_timestamp();
RETURN QUERY
SELECT
p_parallel_workers,
p_setup_cost,
p_tuple_cost,
end_time - start_time,
plan_text::JSON->0->'Plan'->>'Parallel Aware' = 'true';
-- 重置参数
RESET max_parallel_workers_per_gather;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
END;
$$ LANGUAGE plpgsql;
-- 测试不同参数组合
SELECT * FROM test_parallel_params(2, 1000, 0.1);
SELECT * FROM test_parallel_params(4, 100, 0.01);
SELECT * FROM test_parallel_params(8, 10, 0.001);
2. 监控并行执行
sql
-- 创建并行查询监控视图
CREATE VIEW parallel_query_monitor AS
WITH parallel_stats AS (
SELECT
pid,
usename,
application_name,
state,
query,
backend_type,
backend_start,
state_change,
wait_event_type,
wait_event
FROM pg_stat_activity
WHERE backend_type LIKE '%worker%'
OR query LIKE '%Gather%'
OR query LIKE '%Parallel%'
)
SELECT
pid,
usename,
backend_type,
state,
SUBSTRING(query, 1, 50) || '...' as query_preview,
AGE(clock_timestamp(), backend_start) as connection_age,
AGE(clock_timestamp(), state_change) as state_duration,
wait_event_type,
wait_event
FROM parallel_stats
ORDER BY backend_start DESC;
-- 查看当前并行查询状态
SELECT * FROM parallel_query_monitor;
3. 并行计划故障排除
sql
-- 诊断为什么查询没有使用并行计划
CREATE OR REPLACE FUNCTION diagnose_parallel_plan(p_query TEXT)
RETURNS TABLE(
check_item TEXT,
status TEXT,
details TEXT
) AS $$
BEGIN
-- 检查表大小
RETURN QUERY
SELECT
'Table Size Check'::TEXT,
CASE
WHEN pg_relation_size('large_orders') > current_setting('min_parallel_table_scan_size')::BIGINT
THEN 'PASS'
ELSE 'FAIL'
END,
format('Table size: %s, Min required: %s',
pg_size_pretty(pg_relation_size('large_orders')),
current_setting('min_parallel_table_scan_size'));
-- 检查并行工作进程设置
RETURN QUERY
SELECT
'Worker Configuration'::TEXT,
CASE
WHEN current_setting('max_parallel_workers_per_gather')::INT > 0
THEN 'PASS'
ELSE 'FAIL'
END,
format('max_parallel_workers_per_gather: %s',
current_setting('max_parallel_workers_per_gather'));
-- 检查成本参数
RETURN QUERY
SELECT
'Cost Parameters'::TEXT,
'INFO'::TEXT,
format('parallel_setup_cost: %s, parallel_tuple_cost: %s',
current_setting('parallel_setup_cost'),
current_setting('parallel_tuple_cost'));
-- 检查函数并行安全性
RETURN QUERY
WITH function_check AS (
SELECT
proname,
CASE proparallel
WHEN 's' THEN 'SAFE'
WHEN 'r' THEN 'RESTRICTED'
WHEN 'u' THEN 'UNSAFE'
END as safety
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
AND proparallel != 's'
)
SELECT
'Function Parallel Safety'::TEXT,
CASE
WHEN COUNT(*) = 0 THEN 'PASS'
ELSE 'WARNING'
END,
CASE
WHEN COUNT(*) = 0 THEN 'All functions are parallel safe'
ELSE format('%s functions are not parallel safe', COUNT(*))
END
FROM function_check;
END;
$$ LANGUAGE plpgsql;
-- 使用诊断函数
SELECT * FROM diagnose_parallel_plan('SELECT COUNT(*) FROM large_orders');
最佳实践建议
1. 工作负载均衡
TIP
确保工作在所有并行工作进程之间均匀分配
sql
-- 使用 EXPLAIN ANALYZE VERBOSE 查看每个工作进程的统计
EXPLAIN (ANALYZE, VERBOSE, BUFFERS)
SELECT
DATE_TRUNC('day', order_date) as day,
COUNT(*) as orders,
SUM(amount) as revenue
FROM large_orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('day', order_date);
-- 检查输出中每个 worker 的行数是否均衡
2. 内存配置优化
sql
-- 并行哈希连接的内存配置
-- work_mem 影响每个工作进程的内存使用
SET work_mem = '256MB'; -- 每个工作进程可使用的内存
-- 计算总内存使用
-- 总内存 ≈ work_mem × (max_parallel_workers_per_gather + 1)
3. 分区表的并行策略
sql
-- 创建优化的分区策略
CREATE TABLE orders_optimized (
order_id BIGSERIAL,
order_date DATE,
customer_id INT,
amount DECIMAL(10,2)
) PARTITION BY RANGE (order_date);
-- 创建合适大小的分区(避免太多小分区)
CREATE TABLE orders_2023_q1 PARTITION OF orders_optimized
FOR VALUES FROM ('2023-01-01') TO ('2023-04-01');
CREATE TABLE orders_2023_q2 PARTITION OF orders_optimized
FOR VALUES FROM ('2023-04-01') TO ('2023-07-01');
-- 确保每个分区都有适当的索引
CREATE INDEX idx_orders_2023_q1_customer ON orders_2023_q1(customer_id);
CREATE INDEX idx_orders_2023_q2_customer ON orders_2023_q2(customer_id);
总结
PostgreSQL 的并行执行计划通过以下机制提升查询性能:
- 多种并行扫描方式:适应不同的数据访问模式
- 灵活的并行连接策略:根据数据特征选择最优方案
- 两阶段并行聚合:有效减少数据传输开销
- 并行追加优化:提升分区表查询性能
关键要点:
- 合理配置并行参数
- 监控并行执行效果
- 理解并行计划的限制
- 根据工作负载特征优化