Appearance
PostgreSQL 并行查询使用条件完全指南
概述
PostgreSQL 的并行查询功能虽然强大,但并非在所有情况下都能使用。了解并行查询的使用条件和限制,对于优化数据库性能和排查问题至关重要。本文将详细介绍何时可以使用并行查询,以及各种限制条件和解决方案。
基本配置要求
必需的系统配置
关键参数配置
sql
-- 查看当前并行查询相关配置
SELECT name, setting, unit, short_desc
FROM pg_settings
WHERE name IN (
'max_parallel_workers_per_gather',
'max_parallel_workers',
'max_worker_processes',
'min_parallel_table_scan_size',
'min_parallel_index_scan_size',
'parallel_setup_cost',
'parallel_tuple_cost'
)
ORDER BY name;
-- 设置并行查询参数
-- 每个 Gather 节点的最大并行工作进程数
ALTER SYSTEM SET max_parallel_workers_per_gather = 4;
-- 系统总的并行工作进程数限制
ALTER SYSTEM SET max_parallel_workers = 8;
-- 后台工作进程总数限制(包括并行查询和其他后台任务)
ALTER SYSTEM SET max_worker_processes = 16;
-- 重新加载配置
SELECT pg_reload_conf();
参数层级关系
并行查询的限制条件
1. 数据修改操作限制
WARNING
写操作和锁定操作会阻止并行查询计划的生成
sql
-- 创建测试表
CREATE TABLE test_parallel (
id SERIAL PRIMARY KEY,
category VARCHAR(50),
value DECIMAL(10,2),
created_at TIMESTAMP DEFAULT NOW()
);
-- 插入测试数据
INSERT INTO test_parallel (category, value)
SELECT
'Category_' || (random() * 10)::INT,
(random() * 1000)::DECIMAL(10,2)
FROM generate_series(1, 1000000);
-- ❌ 不会使用并行:UPDATE 操作
EXPLAIN (ANALYZE, BUFFERS)
UPDATE test_parallel
SET value = value * 1.1
WHERE category = 'Category_5';
-- ❌ 不会使用并行:DELETE 操作
EXPLAIN (ANALYZE)
DELETE FROM test_parallel
WHERE created_at < NOW() - INTERVAL '30 days';
-- ❌ 不会使用并行:SELECT FOR UPDATE
EXPLAIN (ANALYZE)
SELECT * FROM test_parallel
WHERE value > 500
FOR UPDATE;
-- ✅ 可以使用并行:只读查询
EXPLAIN (ANALYZE)
SELECT category, COUNT(*), AVG(value)
FROM test_parallel
GROUP BY category;
2. 特殊情况下的并行查询
某些创建表的操作可以在底层 SELECT 部分使用并行:
sql
-- ✅ CREATE TABLE AS 可以使用并行
EXPLAIN (ANALYZE, VERBOSE)
CREATE TABLE parallel_summary AS
SELECT
category,
COUNT(*) as count,
SUM(value) as total_value,
AVG(value) as avg_value
FROM test_parallel
GROUP BY category;
-- ✅ SELECT INTO 可以使用并行
EXPLAIN (ANALYZE)
SELECT
DATE_TRUNC('month', created_at) as month,
COUNT(*) as record_count
INTO monthly_summary
FROM test_parallel
GROUP BY DATE_TRUNC('month', created_at);
-- ✅ CREATE MATERIALIZED VIEW 可以使用并行
EXPLAIN (ANALYZE)
CREATE MATERIALIZED VIEW mv_category_stats AS
SELECT
category,
COUNT(*) as total_records,
MIN(value) as min_value,
MAX(value) as max_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as median_value
FROM test_parallel
GROUP BY category;
-- ✅ REFRESH MATERIALIZED VIEW 可以使用并行
EXPLAIN (ANALYZE)
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_category_stats;
3. 游标和循环限制
sql
-- ❌ 游标不支持并行查询
BEGIN;
DECLARE test_cursor CURSOR FOR
SELECT category, COUNT(*)
FROM test_parallel
GROUP BY category;
-- 查看执行计划(不会显示并行)
EXPLAIN (ANALYZE)
FETCH ALL FROM test_cursor;
CLOSE test_cursor;
COMMIT;
-- ❌ PL/pgSQL 循环中的查询不支持并行
CREATE OR REPLACE FUNCTION process_categories()
RETURNS TABLE(category VARCHAR, total BIGINT) AS $$
BEGIN
-- 这个查询不会使用并行计划
FOR category, total IN
SELECT t.category, COUNT(*)
FROM test_parallel t
GROUP BY t.category
LOOP
-- 处理逻辑
RETURN NEXT;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- 替代方案:使用返回结果集的函数
CREATE OR REPLACE FUNCTION get_category_stats()
RETURNS TABLE(category VARCHAR, total BIGINT) AS $$
BEGIN
-- 这个查询可以使用并行计划
RETURN QUERY
SELECT t.category, COUNT(*)::BIGINT
FROM test_parallel t
GROUP BY t.category;
END;
$$ LANGUAGE plpgsql;
4. 函数并行安全性限制
sql
-- 查看函数的并行安全性
SELECT
proname AS function_name,
prosrc AS source_code,
CASE proparallel
WHEN 's' THEN 'PARALLEL SAFE'
WHEN 'r' THEN 'PARALLEL RESTRICTED'
WHEN 'u' THEN 'PARALLEL UNSAFE'
END AS parallel_safety
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
ORDER BY proname;
-- ❌ 使用 PARALLEL UNSAFE 函数会阻止并行查询
CREATE OR REPLACE FUNCTION unsafe_function(val DECIMAL)
RETURNS DECIMAL AS $$
BEGIN
-- 模拟不安全操作
RAISE NOTICE 'Processing value: %', val;
RETURN val * 2;
END;
$$ LANGUAGE plpgsql PARALLEL UNSAFE;
-- 这个查询不会使用并行
EXPLAIN (ANALYZE)
SELECT category, unsafe_function(AVG(value))
FROM test_parallel
GROUP BY category;
-- ✅ 修改为 PARALLEL SAFE
CREATE OR REPLACE FUNCTION safe_function(val DECIMAL)
RETURNS DECIMAL AS $$
SELECT val * 2;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- 现在可以使用并行
EXPLAIN (ANALYZE)
SELECT category, safe_function(AVG(value))
FROM test_parallel
GROUP BY category;
5. 嵌套并行查询限制
sql
-- 创建一个调用查询的函数
CREATE OR REPLACE FUNCTION get_category_count(p_category VARCHAR)
RETURNS BIGINT AS $$
DECLARE
v_count BIGINT;
BEGIN
-- 如果这个函数被并行查询调用,内部查询不会并行
SELECT COUNT(*) INTO v_count
FROM test_parallel
WHERE category = p_category;
RETURN v_count;
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;
-- 外层查询可能并行,但函数内的查询不会
EXPLAIN (ANALYZE, VERBOSE)
SELECT DISTINCT
category,
get_category_count(category) as count
FROM test_parallel
LIMIT 10;
运行时并行执行限制
资源限制检查
sql
-- 创建资源监控函数
CREATE OR REPLACE FUNCTION check_parallel_resources()
RETURNS TABLE(
resource_type TEXT,
current_value INT,
max_value INT,
available INT,
status TEXT
) AS $$
BEGIN
-- 检查总工作进程
RETURN QUERY
WITH worker_stats AS (
SELECT
COUNT(*) FILTER (WHERE backend_type = 'parallel worker') as parallel_workers,
COUNT(*) FILTER (WHERE backend_type LIKE '%worker%') as all_workers
FROM pg_stat_activity
)
SELECT
'Total Worker Processes'::TEXT,
all_workers::INT,
current_setting('max_worker_processes')::INT,
current_setting('max_worker_processes')::INT - all_workers::INT,
CASE
WHEN all_workers < current_setting('max_worker_processes')::INT * 0.8
THEN 'OK'
ELSE 'WARNING'
END
FROM worker_stats;
-- 检查并行工作进程
RETURN QUERY
WITH parallel_stats AS (
SELECT COUNT(*) as count
FROM pg_stat_activity
WHERE backend_type = 'parallel worker'
)
SELECT
'Parallel Workers'::TEXT,
count::INT,
current_setting('max_parallel_workers')::INT,
current_setting('max_parallel_workers')::INT - count::INT,
CASE
WHEN count < current_setting('max_parallel_workers')::INT * 0.8
THEN 'OK'
ELSE 'WARNING'
END
FROM parallel_stats;
END;
$$ LANGUAGE plpgsql;
-- 使用监控函数
SELECT * FROM check_parallel_resources();
客户端协议限制
sql
-- 创建测试函数来模拟不同的执行场景
CREATE OR REPLACE FUNCTION test_fetch_count_impact()
RETURNS VOID AS $$
DECLARE
v_rec RECORD;
BEGIN
-- 标准执行(可以并行)
FOR v_rec IN
SELECT category, COUNT(*)
FROM test_parallel
GROUP BY category
LOOP
-- 处理每条记录
END LOOP;
-- 使用游标的小批量获取(不能并行)
DECLARE cur CURSOR FOR
SELECT category, COUNT(*)
FROM test_parallel
GROUP BY category;
BEGIN
LOOP
FETCH 10 FROM cur INTO v_rec;
EXIT WHEN NOT FOUND;
-- 处理记录
END LOOP;
END;
END;
$$ LANGUAGE plpgsql;
实际应用案例
案例1:电商数据分析优化
sql
-- 创建电商订单表
CREATE TABLE orders (
order_id BIGSERIAL PRIMARY KEY,
customer_id INT,
product_id INT,
order_date DATE,
quantity INT,
unit_price DECIMAL(10,2),
status VARCHAR(20)
);
-- 创建产品表
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR(100),
category VARCHAR(50),
supplier_id INT
);
-- 插入测试数据
INSERT INTO products (product_id, product_name, category, supplier_id)
SELECT
i,
'Product_' || i,
'Category_' || (i % 20),
(i % 100)
FROM generate_series(1, 1000) i;
INSERT INTO orders (customer_id, product_id, order_date, quantity, unit_price, status)
SELECT
(random() * 10000)::INT,
(random() * 1000 + 1)::INT,
DATE '2020-01-01' + (random() * 1460)::INT * INTERVAL '1 day',
(random() * 10 + 1)::INT,
(random() * 100 + 10)::DECIMAL(10,2),
CASE (random() * 3)::INT
WHEN 0 THEN 'pending'
WHEN 1 THEN 'shipped'
ELSE 'delivered'
END
FROM generate_series(1, 5000000);
-- 分析表
ANALYZE orders;
ANALYZE products;
-- 创建分析函数(正确的并行安全标记)
CREATE OR REPLACE FUNCTION calculate_revenue(
p_quantity INT,
p_unit_price DECIMAL,
p_discount DECIMAL DEFAULT 0
)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$
SELECT p_quantity * p_unit_price * (1 - p_discount);
$$;
-- ✅ 可以使用并行的复杂分析查询
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT
p.category,
DATE_TRUNC('quarter', o.order_date) as quarter,
COUNT(DISTINCT o.customer_id) as unique_customers,
COUNT(*) as total_orders,
SUM(calculate_revenue(o.quantity, o.unit_price)) as total_revenue,
AVG(calculate_revenue(o.quantity, o.unit_price)) as avg_order_value
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2023-01-01'
AND o.status = 'delivered'
GROUP BY p.category, DATE_TRUNC('quarter', o.order_date)
HAVING COUNT(*) > 100
ORDER BY p.category, quarter;
案例2:实时报表生成优化
sql
-- 创建实时报表生成过程
CREATE OR REPLACE PROCEDURE generate_daily_report(p_date DATE)
LANGUAGE plpgsql
AS $$
DECLARE
v_start_time TIMESTAMP;
v_end_time TIMESTAMP;
BEGIN
v_start_time := clock_timestamp();
-- 删除旧报表数据
DELETE FROM daily_reports WHERE report_date = p_date;
-- 生成新报表(使用 INSERT SELECT 可以并行)
INSERT INTO daily_reports (
report_date,
category,
total_orders,
total_revenue,
avg_order_value,
top_product
)
SELECT
p_date,
p.category,
COUNT(*) as total_orders,
SUM(o.quantity * o.unit_price) as total_revenue,
AVG(o.quantity * o.unit_price) as avg_order_value,
(
SELECT product_name
FROM products p2
JOIN orders o2 ON p2.product_id = o2.product_id
WHERE p2.category = p.category
AND o2.order_date = p_date
GROUP BY p2.product_name
ORDER BY SUM(o2.quantity * o2.unit_price) DESC
LIMIT 1
) as top_product
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date = p_date
GROUP BY p.category;
v_end_time := clock_timestamp();
-- 记录执行时间
INSERT INTO report_generation_log (report_date, execution_time)
VALUES (p_date, v_end_time - v_start_time);
COMMIT;
END;
$$;
故障排除指南
诊断工具集
sql
-- 综合诊断函数
CREATE OR REPLACE FUNCTION diagnose_parallel_query(p_query TEXT)
RETURNS TABLE(
check_category TEXT,
check_item TEXT,
result TEXT,
recommendation TEXT
) AS $$
BEGIN
-- 1. 配置检查
RETURN QUERY
SELECT
'Configuration'::TEXT,
'max_parallel_workers_per_gather'::TEXT,
current_setting('max_parallel_workers_per_gather'),
CASE
WHEN current_setting('max_parallel_workers_per_gather')::INT = 0
THEN 'Set to value > 0 to enable parallel queries'
ELSE 'OK'
END;
-- 2. 资源可用性
RETURN QUERY
WITH worker_count AS (
SELECT COUNT(*) as active_workers
FROM pg_stat_activity
WHERE backend_type = 'parallel worker'
)
SELECT
'Resources'::TEXT,
'Available parallel workers'::TEXT,
format('%s of %s in use',
active_workers,
current_setting('max_parallel_workers')),
CASE
WHEN active_workers::NUMERIC / current_setting('max_parallel_workers')::NUMERIC > 0.8
THEN 'Consider increasing max_parallel_workers'
ELSE 'Sufficient workers available'
END
FROM worker_count;
-- 3. 表大小检查
RETURN QUERY
SELECT
'Table Size'::TEXT,
c.relname::TEXT,
pg_size_pretty(pg_relation_size(c.oid)),
CASE
WHEN pg_relation_size(c.oid) < current_setting('min_parallel_table_scan_size')::BIGINT
THEN format('Table too small for parallel scan (min: %s)',
current_setting('min_parallel_table_scan_size'))
ELSE 'Table size sufficient for parallel scan'
END
FROM pg_class c
WHERE c.relname IN (
SELECT DISTINCT tablename
FROM pg_tables
WHERE schemaname = 'public'
)
AND c.relkind = 'r';
-- 4. 函数安全性检查
RETURN QUERY
WITH unsafe_functions AS (
SELECT proname
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
AND proparallel = 'u'
)
SELECT
'Functions'::TEXT,
'Unsafe functions'::TEXT,
COALESCE(string_agg(proname, ', '), 'None'),
CASE
WHEN COUNT(*) > 0
THEN 'Mark functions as PARALLEL SAFE/RESTRICTED where appropriate'
ELSE 'All functions are parallel-compatible'
END
FROM unsafe_functions;
END;
$$ LANGUAGE plpgsql;
-- 使用诊断工具
SELECT * FROM diagnose_parallel_query('SELECT * FROM orders');
性能对比测试
sql
-- 创建性能测试框架
CREATE OR REPLACE FUNCTION compare_parallel_performance(
p_query TEXT,
p_parallel_workers INT DEFAULT 4
)
RETURNS TABLE(
execution_mode TEXT,
execution_time INTERVAL,
buffers_hit BIGINT,
buffers_read BIGINT
) AS $$
DECLARE
v_start TIMESTAMP;
v_end TIMESTAMP;
v_buffers RECORD;
BEGIN
-- 测试串行执行
SET max_parallel_workers_per_gather = 0;
v_start := clock_timestamp();
EXECUTE p_query;
v_end := clock_timestamp();
RETURN QUERY
SELECT
'Serial'::TEXT,
v_end - v_start,
0::BIGINT,
0::BIGINT;
-- 测试并行执行
EXECUTE format('SET max_parallel_workers_per_gather = %s', p_parallel_workers);
v_start := clock_timestamp();
EXECUTE p_query;
v_end := clock_timestamp();
RETURN QUERY
SELECT
format('Parallel (%s workers)', p_parallel_workers)::TEXT,
v_end - v_start,
0::BIGINT,
0::BIGINT;
-- 重置设置
RESET max_parallel_workers_per_gather;
END;
$$ LANGUAGE plpgsql;
最佳实践建议
1. 查询设计原则
2. 配置优化建议
sql
-- 根据系统资源设置并行参数
CREATE OR REPLACE FUNCTION recommend_parallel_settings()
RETURNS TABLE(
parameter TEXT,
current_value TEXT,
recommended_value TEXT,
reason TEXT
) AS $$
DECLARE
v_cpu_count INT;
v_total_mem BIGINT;
BEGIN
-- 获取 CPU 核心数(简化示例)
v_cpu_count := 8; -- 实际应从系统获取
RETURN QUERY
SELECT
'max_worker_processes'::TEXT,
current_setting('max_worker_processes'),
(v_cpu_count * 2)::TEXT,
'Set to 2x CPU cores for optimal performance';
RETURN QUERY
SELECT
'max_parallel_workers'::TEXT,
current_setting('max_parallel_workers'),
v_cpu_count::TEXT,
'Set to number of CPU cores';
RETURN QUERY
SELECT
'max_parallel_workers_per_gather'::TEXT,
current_setting('max_parallel_workers_per_gather'),
GREATEST(2, v_cpu_count / 4)::TEXT,
'Set to CPU cores / 4, minimum 2';
END;
$$ LANGUAGE plpgsql;
3. 监控和调优
sql
-- 创建并行查询性能监控视图
CREATE VIEW parallel_query_performance AS
WITH query_stats AS (
SELECT
queryid,
query,
calls,
total_exec_time,
mean_exec_time,
stddev_exec_time,
rows
FROM pg_stat_statements
WHERE query LIKE '%Parallel%'
OR query LIKE '%Gather%'
)
SELECT
queryid,
LEFT(query, 50) || '...' as query_preview,
calls,
round(total_exec_time::numeric, 2) as total_time_ms,
round(mean_exec_time::numeric, 2) as avg_time_ms,
round(stddev_exec_time::numeric, 2) as stddev_time_ms,
rows,
round(rows::numeric / NULLIF(calls, 0), 2) as avg_rows
FROM query_stats
ORDER BY total_exec_time DESC
LIMIT 20;
总结
掌握 PostgreSQL 并行查询的使用条件是优化数据库性能的关键:
- 配置正确:确保系统参数允许并行查询
- 了解限制:避免在不支持的场景中期望并行执行
- 合理设计:编写并行友好的查询和函数
- 持续监控:跟踪并行查询的实际效果
- 灵活调整:根据工作负载特征优化配置
记住
并行查询不是万能的,合理使用才能发挥最大效果