Appearance
PostgreSQL 并行查询安全性完全指南
概述
PostgreSQL 的并行查询功能可以显著提升查询性能,但并非所有操作都适合并行执行。理解并行查询的安全性分类对于编写高效且正确的 SQL 查询至关重要。本文将深入探讨 PostgreSQL 中的并行安全性概念,帮助您掌握如何正确使用并行查询功能。
并行安全性分类
PostgreSQL 将查询中的操作分为三个安全级别:
1. 并行安全(Parallel Safe)
定义:可以在并行工作进程中安全执行的操作,与并行查询完全兼容。
特点:
- ✅ 可以在领导进程执行
- ✅ 可以在工作进程执行
- ✅ 不会产生副作用
- ✅ 结果确定性
2. 并行受限(Parallel Restricted)
定义:只能在领导进程中执行,但不妨碍查询使用并行计划的操作。
特点:
- ✅ 可以在领导进程执行
- ❌ 不能在工作进程执行
- ⚠️ 不会出现在 Gather 节点下方
- ✅ 可以出现在包含 Gather 节点的计划中
3. 并行不安全(Parallel Unsafe)
定义:完全不能与并行查询一起使用的操作。
特点:
- ❌ 查询中存在任何并行不安全操作时,整个查询禁用并行
- ❌ 即使在领导进程中也不能执行
- ❌ 会导致整个查询计划退化为串行执行
内置的并行受限操作
PostgreSQL 中以下操作始终被标记为并行受限:
INFO
这些操作由系统自动识别,无需手动标记
1. CTE(公共表表达式)扫描
sql
-- 示例:CTE 扫描是并行受限的
WITH sales_summary AS (
SELECT
product_id,
SUM(quantity) as total_quantity,
AVG(price) as avg_price
FROM sales
GROUP BY product_id
)
SELECT * FROM sales_summary
WHERE total_quantity > 1000;
-- 执行计划分析
EXPLAIN (ANALYZE, BUFFERS)
WITH sales_summary AS (
SELECT product_id, SUM(quantity) as total_quantity
FROM sales
GROUP BY product_id
)
SELECT * FROM sales_summary WHERE total_quantity > 1000;
输出示例:
CTE Scan on sales_summary (cost=... rows=... width=...)
Filter: (total_quantity > 1000)
CTE sales_summary
-> HashAggregate (cost=... rows=... width=...)
Group Key: sales.product_id
-> Seq Scan on sales (cost=... rows=... width=...)
2. 临时表扫描
sql
-- 创建临时表
CREATE TEMP TABLE temp_high_value_customers AS
SELECT customer_id, SUM(amount) as total_spent
FROM orders
GROUP BY customer_id
HAVING SUM(amount) > 10000;
-- 临时表扫描是并行受限的
EXPLAIN (ANALYZE, VERBOSE)
SELECT t.*, c.customer_name
FROM temp_high_value_customers t
JOIN customers c ON t.customer_id = c.id;
3. 外部表扫描
sql
-- 创建外部表(以 postgres_fdw 为例)
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'remote_host', dbname 'remote_db');
CREATE FOREIGN TABLE foreign_orders (
order_id INT,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2)
) SERVER foreign_server;
-- 外部表扫描默认是并行受限的
EXPLAIN (ANALYZE)
SELECT * FROM foreign_orders
WHERE order_date >= '2024-01-01';
4. 相关子查询(Correlated SubPlan)
sql
-- 包含相关子查询的示例
EXPLAIN (ANALYZE, VERBOSE)
SELECT
c.customer_id,
c.customer_name,
(SELECT COUNT(*)
FROM orders o
WHERE o.customer_id = c.customer_id -- 相关条件
AND o.order_date >= '2024-01-01') as recent_orders
FROM customers c
WHERE c.status = 'ACTIVE';
函数和聚合的并行标记
创建并行安全函数
sql
-- 并行安全函数示例
CREATE OR REPLACE FUNCTION calculate_discount(price DECIMAL, discount_rate DECIMAL)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE -- 重要:表示函数结果仅依赖于输入参数
PARALLEL SAFE -- 标记为并行安全
AS $$
SELECT price * (1 - discount_rate);
$$;
-- 使用示例
EXPLAIN (ANALYZE, VERBOSE)
SELECT
product_id,
price,
calculate_discount(price, 0.1) as discounted_price
FROM products
WHERE category = 'Electronics';
创建并行受限函数
sql
-- 并行受限函数示例:使用了随机数
CREATE OR REPLACE FUNCTION random_sample(probability FLOAT)
RETURNS BOOLEAN
LANGUAGE SQL
PARALLEL RESTRICTED -- 标记为并行受限
AS $$
SELECT random() < probability;
$$;
-- 使用 setseed 的函数也是并行受限的
CREATE OR REPLACE FUNCTION deterministic_random(seed FLOAT)
RETURNS FLOAT
LANGUAGE plpgsql
PARALLEL RESTRICTED
AS $$
BEGIN
PERFORM setseed(seed);
RETURN random();
END;
$$;
创建并行不安全函数
sql
-- 并行不安全函数示例:写入数据库
CREATE OR REPLACE FUNCTION log_access(table_name TEXT)
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE -- 标记为并行不安全
AS $$
BEGIN
INSERT INTO access_log (table_name, access_time, user_name)
VALUES (table_name, NOW(), current_user);
END;
$$;
-- 使用序列的函数
CREATE OR REPLACE FUNCTION get_next_id(sequence_name TEXT)
RETURNS BIGINT
LANGUAGE plpgsql
PARALLEL UNSAFE
AS $$
DECLARE
next_val BIGINT;
BEGIN
EXECUTE format('SELECT nextval(%L)', sequence_name) INTO next_val;
RETURN next_val;
END;
$$;
并行安全性决策流程图
实际应用示例
示例 1:电商订单分析(并行安全)
sql
-- 创建并行安全的聚合函数
CREATE OR REPLACE FUNCTION calculate_order_score(
amount DECIMAL,
item_count INT,
is_premium BOOLEAN
)
RETURNS DECIMAL
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$
SELECT
CASE
WHEN is_premium THEN amount * 1.2 + item_count * 10
ELSE amount + item_count * 5
END;
$$;
-- 使用并行查询分析订单
SET max_parallel_workers_per_gather = 4;
SET parallel_setup_cost = 100;
SET parallel_tuple_cost = 0.01;
EXPLAIN (ANALYZE, VERBOSE, BUFFERS)
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(calculate_order_score(amount, item_count, is_premium)) as avg_score
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;
预期输出:
Finalize GroupAggregate (cost=... rows=12 width=48)
Group Key: (date_trunc('month', order_date))
-> Gather Merge (cost=... rows=48 width=48)
Workers Planned: 4
Workers Launched: 4
-> Sort (cost=... rows=12 width=48)
Sort Key: (date_trunc('month', order_date))
-> Partial HashAggregate (cost=... rows=12 width=48)
Group Key: date_trunc('month', order_date)
-> Parallel Seq Scan on orders
Filter: (order_date >= '2024-01-01')
示例 2:审计日志系统(并行不安全)
sql
-- 创建审计表
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
table_name TEXT,
operation TEXT,
user_name TEXT,
timestamp TIMESTAMP DEFAULT NOW(),
row_data JSONB
);
-- 创建审计函数(并行不安全)
CREATE OR REPLACE FUNCTION audit_data_access(
p_table_name TEXT,
p_operation TEXT,
p_row_data JSONB
)
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE -- 因为写入数据库
AS $$
BEGIN
INSERT INTO audit_log (table_name, operation, user_name, row_data)
VALUES (p_table_name, p_operation, current_user, p_row_data);
END;
$$;
-- 使用审计函数的查询将禁用并行
EXPLAIN (ANALYZE)
SELECT
customer_id,
customer_name,
audit_data_access('customers', 'SELECT', row_to_json(c)::jsonb)
FROM customers c
WHERE status = 'ACTIVE';
示例 3:缓存策略实现(并行受限)
sql
-- 创建缓存表
CREATE TABLE query_cache (
cache_key TEXT PRIMARY KEY,
cache_value JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- 创建缓存查询函数(并行受限)
CREATE OR REPLACE FUNCTION get_cached_result(p_cache_key TEXT)
RETURNS JSONB
LANGUAGE plpgsql
PARALLEL RESTRICTED -- 访问了会话级别的状态
AS $$
DECLARE
v_result JSONB;
BEGIN
-- 检查应用级别的缓存设置
IF current_setting('app.use_cache', true) = 'true' THEN
SELECT cache_value INTO v_result
FROM query_cache
WHERE cache_key = p_cache_key
AND created_at > NOW() - INTERVAL '1 hour';
END IF;
RETURN v_result;
END;
$$;
最佳实践建议
1. 函数标记原则
TIP
遵循"宁可保守"的原则:如有疑问,选择更严格的安全级别
sql
-- ✅ 好的实践:明确标记函数的并行安全性
CREATE FUNCTION safe_calculation(a INT, b INT)
RETURNS INT
LANGUAGE SQL
IMMUTABLE
PARALLEL SAFE
AS $$ SELECT a + b; $$;
-- ❌ 避免:不标记并行安全性(默认为 UNSAFE)
CREATE FUNCTION unclear_function(a INT, b INT)
RETURNS INT
LANGUAGE SQL
AS $$ SELECT a + b; $$;
2. 性能优化策略
sql
-- 创建测试表
CREATE TABLE large_table (
id SERIAL PRIMARY KEY,
category TEXT,
value DECIMAL,
created_at TIMESTAMP
);
-- 插入测试数据
INSERT INTO large_table (category, value, created_at)
SELECT
'Category_' || (random() * 10)::INT,
random() * 1000,
NOW() - (random() * 365)::INT * INTERVAL '1 day'
FROM generate_series(1, 10000000);
-- 创建索引
CREATE INDEX idx_large_table_category ON large_table(category);
CREATE INDEX idx_large_table_created_at ON large_table(created_at);
-- 优化并行查询参数
SET max_parallel_workers_per_gather = 4;
SET min_parallel_table_scan_size = '8MB';
SET min_parallel_index_scan_size = '512kB';
-- 比较串行和并行执行
-- 串行执行
SET max_parallel_workers_per_gather = 0;
EXPLAIN (ANALYZE, BUFFERS)
SELECT category, COUNT(*), AVG(value)
FROM large_table
WHERE created_at >= '2024-01-01'
GROUP BY category;
-- 并行执行
SET max_parallel_workers_per_gather = 4;
EXPLAIN (ANALYZE, BUFFERS)
SELECT category, COUNT(*), AVG(value)
FROM large_table
WHERE created_at >= '2024-01-01'
GROUP BY category;
3. 锁处理注意事项
WARNING
并行工作进程中获取的锁会在工作进程退出时释放,而不是事务结束时
sql
-- 需要标记为 RESTRICTED 的函数示例
CREATE OR REPLACE FUNCTION check_and_lock_inventory(p_product_id INT)
RETURNS BOOLEAN
LANGUAGE plpgsql
PARALLEL RESTRICTED -- 重要:因为获取了额外的锁
AS $$
DECLARE
v_available BOOLEAN;
BEGIN
-- 获取行级锁
PERFORM 1 FROM inventory
WHERE product_id = p_product_id
FOR UPDATE;
SELECT quantity > 0 INTO v_available
FROM inventory
WHERE product_id = p_product_id;
RETURN v_available;
END;
$$;
故障排除指南
常见错误及解决方案
错误 1:并行查询意外禁用
sql
-- 诊断查询
SELECT name, setting
FROM pg_settings
WHERE name LIKE '%parallel%'
ORDER BY name;
-- 检查函数的并行安全性
SELECT
proname AS function_name,
proparallel AS parallel_safety,
CASE proparallel
WHEN 's' THEN 'SAFE'
WHEN 'r' THEN 'RESTRICTED'
WHEN 'u' THEN 'UNSAFE'
END AS safety_label
FROM pg_proc
WHERE pronamespace = 'public'::regnamespace
ORDER BY proname;
错误 2:函数标记错误导致的问题
sql
-- 创建测试函数(错误标记)
CREATE OR REPLACE FUNCTION bad_function()
RETURNS VOID
LANGUAGE plpgsql
PARALLEL SAFE -- 错误:实际上是不安全的
AS $$
BEGIN
-- 这会导致并行查询出错
CREATE TEMP TABLE temp_result AS SELECT 1;
END;
$$;
-- 正确的版本
CREATE OR REPLACE FUNCTION corrected_function()
RETURNS VOID
LANGUAGE plpgsql
PARALLEL UNSAFE -- 正确标记
AS $$
BEGIN
CREATE TEMP TABLE temp_result AS SELECT 1;
END;
$$;
性能监控和调优
监控并行查询执行
sql
-- 创建监控视图
CREATE VIEW parallel_query_stats AS
SELECT
queryid,
query,
calls,
total_exec_time,
mean_exec_time,
max_exec_time,
rows,
100.0 * shared_blks_hit /
NULLIF(shared_blks_hit + shared_blks_read, 0) AS hit_ratio
FROM pg_stat_statements
WHERE query LIKE '%Parallel%'
OR query LIKE '%Gather%'
ORDER BY total_exec_time DESC;
-- 查看当前并行工作进程
SELECT
pid,
usename,
application_name,
state,
query,
backend_type
FROM pg_stat_activity
WHERE backend_type LIKE '%parallel%'
ORDER BY backend_start;
总结
理解和正确使用 PostgreSQL 的并行查询安全性分类是优化数据库性能的关键。记住以下要点:
- 默认保守:不确定时选择更严格的安全级别
- 充分测试:在生产环境部署前 thoroughly 测试函数的并行行为
- 监控性能:定期检查并行查询的执行效果
- 文档化:为自定义函数添加清晰的并行安全性说明