Appearance
PostgreSQL 数据库填充优化完全指南
概述
数据库的初始填充和大批量数据导入是 PostgreSQL 管理中的常见任务。无论是迁移数据、恢复备份还是批量导入业务数据,掌握高效的填充技术可以将原本需要数小时的操作缩短到几分钟。本文将详细介绍各种优化技术,帮助您实现最快速的数据加载。
数据加载性能影响因素
性能瓶颈分析
核心优化技术
1. 禁用自动提交
原理:每次自动提交都会触发 WAL 刷盘,造成大量 I/O 开销。
sql
-- 创建测试环境
CREATE TABLE load_test (
id SERIAL PRIMARY KEY,
data TEXT,
value NUMERIC,
created_at TIMESTAMP DEFAULT NOW()
);
-- 性能对比函数
CREATE OR REPLACE FUNCTION compare_autocommit_performance(
p_rows INT
)
RETURNS TABLE(
method TEXT,
duration INTERVAL,
rows_per_second NUMERIC,
relative_performance NUMERIC
) AS $$
DECLARE
v_start TIMESTAMP;
v_end TIMESTAMP;
v_baseline NUMERIC;
BEGIN
-- 测试1:自动提交模式(每行一个事务)
TRUNCATE load_test;
v_start := clock_timestamp();
FOR i IN 1..p_rows LOOP
INSERT INTO load_test (data, value)
VALUES ('Row ' || i, random() * 1000);
END LOOP;
v_end := clock_timestamp();
v_baseline := p_rows / EXTRACT(EPOCH FROM (v_end - v_start));
RETURN QUERY
SELECT
'Autocommit ON',
v_end - v_start,
v_baseline,
1.0;
-- 测试2:单个事务
TRUNCATE load_test;
v_start := clock_timestamp();
BEGIN;
FOR i IN 1..p_rows LOOP
INSERT INTO load_test (data, value)
VALUES ('Row ' || i, random() * 1000);
END LOOP;
COMMIT;
v_end := clock_timestamp();
RETURN QUERY
SELECT
'Single Transaction',
v_end - v_start,
p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
(p_rows / EXTRACT(EPOCH FROM (v_end - v_start))) / v_baseline;
-- 测试3:批量事务(每1000行提交一次)
TRUNCATE load_test;
v_start := clock_timestamp();
FOR batch IN 0..(p_rows/1000) LOOP
BEGIN;
FOR i IN 1..LEAST(1000, p_rows - batch * 1000) LOOP
INSERT INTO load_test (data, value)
VALUES ('Row ' || (batch * 1000 + i), random() * 1000);
END LOOP;
COMMIT;
END LOOP;
v_end := clock_timestamp();
RETURN QUERY
SELECT
'Batch Transaction (1000)',
v_end - v_start,
p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
(p_rows / EXTRACT(EPOCH FROM (v_end - v_start))) / v_baseline;
END;
$$ LANGUAGE plpgsql;
-- 执行测试
SELECT * FROM compare_autocommit_performance(10000);
2. 使用 COPY 命令
COPY 是 PostgreSQL 中最快的数据加载方法
sql
-- 创建 COPY 性能测试
CREATE OR REPLACE FUNCTION compare_insert_vs_copy(
p_rows INT
)
RETURNS TABLE(
method TEXT,
duration INTERVAL,
rows_per_second NUMERIC,
mb_per_second NUMERIC
) AS $$
DECLARE
v_start TIMESTAMP;
v_end TIMESTAMP;
v_size BIGINT;
v_data_file TEXT := '/tmp/test_data.csv';
BEGIN
-- 准备测试数据
EXECUTE format('
COPY (
SELECT
i as id,
''Data '' || i as data,
random() * 1000 as value,
NOW() - (random() * 365)::INT * INTERVAL ''1 day'' as created_at
FROM generate_series(1, %s) i
) TO %L WITH (FORMAT CSV, HEADER)',
p_rows, v_data_file
);
-- 测试 INSERT
CREATE TEMP TABLE insert_test (LIKE load_test INCLUDING ALL);
v_start := clock_timestamp();
INSERT INTO insert_test (data, value)
SELECT 'Data ' || i, random() * 1000
FROM generate_series(1, p_rows) i;
v_end := clock_timestamp();
v_size := pg_relation_size('insert_test');
RETURN QUERY
SELECT
'INSERT',
v_end - v_start,
p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
-- 测试 COPY
CREATE TEMP TABLE copy_test (LIKE load_test INCLUDING ALL);
v_start := clock_timestamp();
EXECUTE format('COPY copy_test FROM %L WITH (FORMAT CSV, HEADER)', v_data_file);
v_end := clock_timestamp();
v_size := pg_relation_size('copy_test');
RETURN QUERY
SELECT
'COPY',
v_end - v_start,
p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
-- 测试 COPY FREEZE(跳过可见性信息)
CREATE TEMP TABLE copy_freeze_test (LIKE load_test INCLUDING ALL);
v_start := clock_timestamp();
BEGIN;
TRUNCATE copy_freeze_test;
EXECUTE format('COPY copy_freeze_test FROM %L WITH (FORMAT CSV, HEADER, FREEZE)', v_data_file);
COMMIT;
v_end := clock_timestamp();
RETURN QUERY
SELECT
'COPY FREEZE',
v_end - v_start,
p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
END;
$$ LANGUAGE plpgsql;
3. 索引管理策略
sql
-- 创建索引管理辅助函数
CREATE OR REPLACE FUNCTION save_and_drop_indexes(
p_table_name TEXT
)
RETURNS TABLE(
index_name TEXT,
index_definition TEXT
) AS $$
BEGIN
-- 保存索引定义
CREATE TEMP TABLE IF NOT EXISTS saved_indexes (
table_name TEXT,
index_name TEXT,
index_definition TEXT,
saved_at TIMESTAMP DEFAULT NOW()
);
-- 保存并删除索引
INSERT INTO saved_indexes (table_name, index_name, index_definition)
SELECT
p_table_name,
indexname,
indexdef
FROM pg_indexes
WHERE tablename = p_table_name
AND indexname NOT LIKE '%_pkey' -- 保留主键
RETURNING index_name, index_definition;
-- 删除索引
FOR index_name IN
SELECT indexname
FROM pg_indexes
WHERE tablename = p_table_name
AND indexname NOT LIKE '%_pkey'
LOOP
EXECUTE format('DROP INDEX %I', index_name);
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- 重建索引函数
CREATE OR REPLACE FUNCTION restore_indexes(
p_table_name TEXT
)
RETURNS VOID AS $$
DECLARE
v_index RECORD;
BEGIN
-- 并行重建索引
SET max_parallel_maintenance_workers = 4;
SET maintenance_work_mem = '1GB';
FOR v_index IN
SELECT index_definition
FROM saved_indexes
WHERE table_name = p_table_name
ORDER BY saved_at DESC
LOOP
EXECUTE v_index.index_definition;
END LOOP;
-- 清理保存的索引信息
DELETE FROM saved_indexes WHERE table_name = p_table_name;
RESET max_parallel_maintenance_workers;
RESET maintenance_work_mem;
END;
$$ LANGUAGE plpgsql;
-- 测试索引对加载性能的影响
CREATE OR REPLACE FUNCTION test_index_impact(
p_rows INT
)
RETURNS TABLE(
scenario TEXT,
load_time INTERVAL,
index_time INTERVAL,
total_time INTERVAL
) AS $$
DECLARE
v_start TIMESTAMP;
v_load_end TIMESTAMP;
v_index_end TIMESTAMP;
BEGIN
-- 场景1:先加载后建索引
CREATE TEMP TABLE test_table1 (
id SERIAL PRIMARY KEY,
col1 TEXT,
col2 INT,
col3 TIMESTAMP,
col4 NUMERIC
);
v_start := clock_timestamp();
INSERT INTO test_table1 (col1, col2, col3, col4)
SELECT
'Text ' || i,
i % 1000,
NOW() - (i % 365) * INTERVAL '1 day',
random() * 1000
FROM generate_series(1, p_rows) i;
v_load_end := clock_timestamp();
CREATE INDEX idx1_1 ON test_table1(col1);
CREATE INDEX idx1_2 ON test_table1(col2);
CREATE INDEX idx1_3 ON test_table1(col3);
CREATE INDEX idx1_4 ON test_table1(col1, col2);
v_index_end := clock_timestamp();
RETURN QUERY
SELECT
'Load then Index',
v_load_end - v_start,
v_index_end - v_load_end,
v_index_end - v_start;
-- 场景2:带索引加载
CREATE TEMP TABLE test_table2 (
id SERIAL PRIMARY KEY,
col1 TEXT,
col2 INT,
col3 TIMESTAMP,
col4 NUMERIC
);
CREATE INDEX idx2_1 ON test_table2(col1);
CREATE INDEX idx2_2 ON test_table2(col2);
CREATE INDEX idx2_3 ON test_table2(col3);
CREATE INDEX idx2_4 ON test_table2(col1, col2);
v_start := clock_timestamp();
INSERT INTO test_table2 (col1, col2, col3, col4)
SELECT
'Text ' || i,
i % 1000,
NOW() - (i % 365) * INTERVAL '1 day',
random() * 1000
FROM generate_series(1, p_rows) i;
v_load_end := clock_timestamp();
RETURN QUERY
SELECT
'Load with Indexes',
v_load_end - v_start,
INTERVAL '0',
v_load_end - v_start;
END;
$$ LANGUAGE plpgsql;
4. 外键约束管理
sql
-- 外键约束管理工具
CREATE OR REPLACE FUNCTION manage_foreign_keys(
p_action TEXT, -- 'disable' 或 'enable'
p_table_name TEXT DEFAULT NULL
)
RETURNS TABLE(
constraint_name TEXT,
table_name TEXT,
status TEXT
) AS $$
DECLARE
v_constraint RECORD;
BEGIN
IF p_action = 'disable' THEN
FOR v_constraint IN
SELECT
con.conname,
con.conrelid::regclass::TEXT as table_name,
pg_get_constraintdef(con.oid) as definition
FROM pg_constraint con
WHERE con.contype = 'f'
AND (p_table_name IS NULL OR con.conrelid::regclass::TEXT = p_table_name)
LOOP
EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I',
v_constraint.table_name, v_constraint.conname);
-- 保存约束定义
INSERT INTO saved_constraints (constraint_name, table_name, definition)
VALUES (v_constraint.conname, v_constraint.table_name, v_constraint.definition);
RETURN QUERY
SELECT v_constraint.conname, v_constraint.table_name, 'DROPPED';
END LOOP;
ELSIF p_action = 'enable' THEN
FOR v_constraint IN
SELECT * FROM saved_constraints
WHERE p_table_name IS NULL OR table_name = p_table_name
LOOP
EXECUTE format('ALTER TABLE %s ADD CONSTRAINT %I %s',
v_constraint.table_name,
v_constraint.constraint_name,
v_constraint.definition);
RETURN QUERY
SELECT v_constraint.constraint_name, v_constraint.table_name, 'CREATED';
END LOOP;
-- 清理保存的约束
DELETE FROM saved_constraints
WHERE p_table_name IS NULL OR table_name = p_table_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- 创建保存约束的表
CREATE TABLE IF NOT EXISTS saved_constraints (
constraint_name TEXT,
table_name TEXT,
definition TEXT,
saved_at TIMESTAMP DEFAULT NOW()
);
5. 内存参数优化
sql
-- 动态内存配置优化
CREATE OR REPLACE FUNCTION optimize_load_settings(
p_data_size_gb NUMERIC
)
RETURNS TABLE(
parameter TEXT,
original_value TEXT,
optimized_value TEXT,
recommendation TEXT
) AS $$
DECLARE
v_total_ram_gb NUMERIC;
BEGIN
-- 假设系统内存(实际应从系统获取)
v_total_ram_gb := 64;
-- 保存原始设置
CREATE TEMP TABLE IF NOT EXISTS original_settings AS
SELECT name, setting, unit
FROM pg_settings
WHERE name IN (
'maintenance_work_mem',
'work_mem',
'shared_buffers',
'effective_cache_size',
'max_wal_size',
'checkpoint_timeout'
);
-- maintenance_work_mem
RETURN QUERY
SELECT
'maintenance_work_mem',
current_setting('maintenance_work_mem'),
CASE
WHEN v_data_size_gb < 1 THEN '256MB'
WHEN v_data_size_gb < 10 THEN '1GB'
WHEN v_data_size_gb < 100 THEN '2GB'
ELSE '4GB'
END,
'Increase for faster index creation';
-- work_mem
RETURN QUERY
SELECT
'work_mem',
current_setting('work_mem'),
CASE
WHEN v_data_size_gb < 10 THEN '128MB'
WHEN v_data_size_gb < 100 THEN '256MB'
ELSE '512MB'
END,
'Increase for complex queries during load';
-- max_wal_size
RETURN QUERY
SELECT
'max_wal_size',
current_setting('max_wal_size'),
CASE
WHEN v_data_size_gb < 10 THEN '4GB'
WHEN v_data_size_gb < 100 THEN '16GB'
ELSE '32GB'
END,
'Reduce checkpoint frequency during load';
-- checkpoint_timeout
RETURN QUERY
SELECT
'checkpoint_timeout',
current_setting('checkpoint_timeout'),
'30min',
'Increase checkpoint interval';
END;
$$ LANGUAGE plpgsql;
-- 应用优化设置
CREATE OR REPLACE PROCEDURE apply_load_optimization()
LANGUAGE plpgsql
AS $$
BEGIN
-- 应用批量加载优化设置
SET maintenance_work_mem = '2GB';
SET work_mem = '256MB';
SET max_wal_size = '16GB';
SET checkpoint_timeout = '30min';
SET synchronous_commit = 'off';
SET wal_buffers = '64MB';
SET checkpoint_completion_target = 0.9;
RAISE NOTICE 'Bulk load optimization settings applied';
END;
$$;
6. WAL 和复制管理
sql
-- WAL 设置管理
CREATE OR REPLACE FUNCTION manage_wal_settings(
p_mode TEXT -- 'minimal', 'replica', 'logical'
)
RETURNS TABLE(
setting TEXT,
value TEXT,
requires_restart BOOLEAN
) AS $$
BEGIN
IF p_mode = 'minimal' THEN
-- 最小 WAL 模式(最快但无复制)
RETURN QUERY VALUES
('wal_level', 'minimal', true),
('max_wal_senders', '0', true),
('archive_mode', 'off', true),
('synchronous_commit', 'off', false),
('full_page_writes', 'off', false);
ELSIF p_mode = 'replica' THEN
-- 物理复制模式
RETURN QUERY VALUES
('wal_level', 'replica', true),
('max_wal_senders', '3', true),
('archive_mode', 'on', true),
('synchronous_commit', 'on', false),
('full_page_writes', 'on', false);
ELSIF p_mode = 'logical' THEN
-- 逻辑复制模式
RETURN QUERY VALUES
('wal_level', 'logical', true),
('max_wal_senders', '10', true),
('archive_mode', 'on', true),
('synchronous_commit', 'on', false),
('full_page_writes', 'on', false);
END IF;
RAISE NOTICE 'Settings for % mode displayed. Restart required for settings marked true.', p_mode;
END;
$$ LANGUAGE plpgsql;
实际应用案例
案例1:大规模数据迁移
sql
-- 创建数据迁移框架
CREATE OR REPLACE PROCEDURE migrate_large_table(
p_source_table TEXT,
p_target_table TEXT,
p_batch_size INT DEFAULT 100000
)
LANGUAGE plpgsql
AS $$
DECLARE
v_start_time TIMESTAMP;
v_end_time TIMESTAMP;
v_total_rows BIGINT;
v_migrated_rows BIGINT := 0;
v_batch_start BIGINT := 0;
BEGIN
v_start_time := clock_timestamp();
-- 获取源表行数
EXECUTE format('SELECT COUNT(*) FROM %I', p_source_table) INTO v_total_rows;
RAISE NOTICE 'Starting migration of % rows from % to %', v_total_rows, p_source_table, p_target_table;
-- 创建目标表(如果不存在)
EXECUTE format('CREATE TABLE IF NOT EXISTS %I (LIKE %I INCLUDING ALL)', p_target_table, p_source_table);
-- 删除目标表索引
PERFORM save_and_drop_indexes(p_target_table);
-- 禁用目标表触发器
EXECUTE format('ALTER TABLE %I DISABLE TRIGGER ALL', p_target_table);
-- 优化设置
SET LOCAL maintenance_work_mem = '2GB';
SET LOCAL work_mem = '512MB';
SET LOCAL synchronous_commit = 'off';
-- 批量迁移数据
LOOP
-- 使用 COPY 进行批量传输
EXECUTE format('
INSERT INTO %I
SELECT * FROM %I
ORDER BY ctid
LIMIT %s OFFSET %s',
p_target_table, p_source_table, p_batch_size, v_batch_start
);
GET DIAGNOSTICS v_migrated_rows = ROW_COUNT;
v_batch_start := v_batch_start + v_migrated_rows;
-- 进度报告
IF v_batch_start % (p_batch_size * 10) = 0 THEN
RAISE NOTICE 'Progress: % / % rows (%%)',
v_batch_start, v_total_rows,
round(v_batch_start::NUMERIC / v_total_rows * 100, 2);
END IF;
EXIT WHEN v_migrated_rows < p_batch_size;
END LOOP;
-- 重建索引
RAISE NOTICE 'Rebuilding indexes...';
PERFORM restore_indexes(p_target_table);
-- 重新启用触发器
EXECUTE format('ALTER TABLE %I ENABLE TRIGGER ALL', p_target_table);
-- 更新统计信息
EXECUTE format('ANALYZE %I', p_target_table);
v_end_time := clock_timestamp();
RAISE NOTICE 'Migration completed in %, % rows migrated',
v_end_time - v_start_time, v_batch_start;
END;
$$;
案例2:实时数据同步加载
sql
-- 创建增量数据加载系统
CREATE TABLE data_load_jobs (
job_id SERIAL PRIMARY KEY,
job_name TEXT NOT NULL,
source_file TEXT,
target_table TEXT,
status TEXT DEFAULT 'pending',
started_at TIMESTAMP,
completed_at TIMESTAMP,
rows_loaded BIGINT,
error_message TEXT
);
CREATE OR REPLACE FUNCTION process_data_load_job(
p_job_id INT
)
RETURNS VOID AS $$
DECLARE
v_job RECORD;
v_temp_table TEXT;
v_row_count BIGINT;
BEGIN
-- 获取作业信息
SELECT * INTO v_job FROM data_load_jobs WHERE job_id = p_job_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'Job % not found', p_job_id;
END IF;
-- 更新作业状态
UPDATE data_load_jobs
SET status = 'running', started_at = NOW()
WHERE job_id = p_job_id;
BEGIN
-- 创建临时表
v_temp_table := 'temp_load_' || p_job_id;
EXECUTE format('CREATE TEMP TABLE %I (LIKE %I)', v_temp_table, v_job.target_table);
-- 加载数据到临时表
EXECUTE format('COPY %I FROM %L WITH (FORMAT CSV, HEADER)',
v_temp_table, v_job.source_file);
-- 获取加载的行数
EXECUTE format('SELECT COUNT(*) FROM %I', v_temp_table) INTO v_row_count;
-- 数据验证
PERFORM validate_loaded_data(v_temp_table);
-- 合并到目标表
EXECUTE format('
INSERT INTO %I
SELECT * FROM %I
ON CONFLICT DO NOTHING',
v_job.target_table, v_temp_table
);
-- 更新作业状态
UPDATE data_load_jobs
SET status = 'completed',
completed_at = NOW(),
rows_loaded = v_row_count
WHERE job_id = p_job_id;
EXCEPTION WHEN OTHERS THEN
-- 错误处理
UPDATE data_load_jobs
SET status = 'failed',
error_message = SQLERRM
WHERE job_id = p_job_id;
RAISE;
END;
END;
$$ LANGUAGE plpgsql;
-- 数据验证函数
CREATE OR REPLACE FUNCTION validate_loaded_data(
p_table_name TEXT
)
RETURNS VOID AS $$
DECLARE
v_null_count BIGINT;
v_duplicate_count BIGINT;
BEGIN
-- 检查必填字段
EXECUTE format('
SELECT COUNT(*)
FROM %I
WHERE id IS NULL OR created_at IS NULL',
p_table_name
) INTO v_null_count;
IF v_null_count > 0 THEN
RAISE EXCEPTION 'Found % rows with NULL in required fields', v_null_count;
END IF;
-- 检查重复
EXECUTE format('
SELECT COUNT(*)
FROM (
SELECT id, COUNT(*)
FROM %I
GROUP BY id
HAVING COUNT(*) > 1
) dup',
p_table_name
) INTO v_duplicate_count;
IF v_duplicate_count > 0 THEN
RAISE WARNING 'Found % duplicate IDs', v_duplicate_count;
END IF;
END;
$$ LANGUAGE plpgsql;
案例3:自动化的 pg_dump 恢复优化
sql
-- pg_dump 恢复优化脚本
CREATE OR REPLACE PROCEDURE optimize_pg_restore(
p_dump_file TEXT,
p_jobs INT DEFAULT 4
)
LANGUAGE plpgsql
AS $$
DECLARE
v_start_time TIMESTAMP;
v_end_time TIMESTAMP;
BEGIN
v_start_time := clock_timestamp();
-- 1. 设置优化参数
ALTER SYSTEM SET maintenance_work_mem = '2GB';
ALTER SYSTEM SET max_wal_size = '32GB';
ALTER SYSTEM SET checkpoint_timeout = '1h';
ALTER SYSTEM SET checkpoint_completion_target = 0.9;
ALTER SYSTEM SET wal_level = 'minimal';
ALTER SYSTEM SET archive_mode = 'off';
ALTER SYSTEM SET max_wal_senders = 0;
-- 重载配置
SELECT pg_reload_conf();
-- 2. 执行恢复
RAISE NOTICE 'Starting restore with % parallel jobs...', p_jobs;
-- 注意:实际的 pg_restore 命令需要在 shell 中执行
-- 这里只是示例
RAISE NOTICE 'Execute: pg_restore -j % -d dbname %', p_jobs, p_dump_file;
-- 3. 恢复完成后的处理
-- 运行 ANALYZE
RAISE NOTICE 'Running ANALYZE on all tables...';
PERFORM analyze_all_tables();
-- 4. 恢复生产设置
ALTER SYSTEM RESET maintenance_work_mem;
ALTER SYSTEM RESET max_wal_size;
ALTER SYSTEM RESET checkpoint_timeout;
ALTER SYSTEM RESET checkpoint_completion_target;
ALTER SYSTEM RESET wal_level;
ALTER SYSTEM RESET archive_mode;
ALTER SYSTEM RESET max_wal_senders;
SELECT pg_reload_conf();
v_end_time := clock_timestamp();
RAISE NOTICE 'Restore optimization completed in %', v_end_time - v_start_time;
END;
$$;
-- 分析所有表
CREATE OR REPLACE FUNCTION analyze_all_tables()
RETURNS VOID AS $$
DECLARE
v_table RECORD;
v_count INT := 0;
BEGIN
FOR v_table IN
SELECT schemaname, tablename
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_relation_size(schemaname||'.'||tablename) DESC
LOOP
EXECUTE format('ANALYZE %I.%I', v_table.schemaname, v_table.tablename);
v_count := v_count + 1;
IF v_count % 10 = 0 THEN
RAISE NOTICE 'Analyzed % tables...', v_count;
END IF;
END LOOP;
RAISE NOTICE 'Total tables analyzed: %', v_count;
END;
$$ LANGUAGE plpgsql;
性能监控和诊断
实时加载监控
sql
-- 创建加载性能监控视图
CREATE OR REPLACE VIEW data_load_monitor AS
WITH current_activity AS (
SELECT
pid,
usename,
application_name,
state,
query,
query_start,
NOW() - query_start as duration
FROM pg_stat_activity
WHERE state = 'active'
AND (query ILIKE '%COPY%' OR query ILIKE '%INSERT%')
),
io_stats AS (
SELECT
SUM(blks_read) as blocks_read,
SUM(blks_hit) as blocks_hit,
CASE
WHEN SUM(blks_read + blks_hit) > 0
THEN SUM(blks_hit)::NUMERIC / SUM(blks_read + blks_hit) * 100
ELSE 0
END as cache_hit_ratio
FROM pg_stat_database
WHERE datname = current_database()
)
SELECT
ca.*,
io.blocks_read,
io.blocks_hit,
round(io.cache_hit_ratio, 2) as cache_hit_pct,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')) as wal_written
FROM current_activity ca
CROSS JOIN io_stats io;
-- 加载进度跟踪函数
CREATE OR REPLACE FUNCTION track_load_progress(
p_table_name TEXT
)
RETURNS TABLE(
metric TEXT,
value TEXT
) AS $$
DECLARE
v_row_count BIGINT;
v_table_size TEXT;
v_index_count INT;
v_total_index_size TEXT;
BEGIN
-- 行数
EXECUTE format('SELECT COUNT(*) FROM %I', p_table_name) INTO v_row_count;
RETURN QUERY SELECT 'Row Count', v_row_count::TEXT;
-- 表大小
SELECT pg_size_pretty(pg_relation_size(p_table_name)) INTO v_table_size;
RETURN QUERY SELECT 'Table Size', v_table_size;
-- 索引信息
SELECT COUNT(*), pg_size_pretty(SUM(pg_relation_size(indexrelid)))
INTO v_index_count, v_total_index_size
FROM pg_index
WHERE indrelid = p_table_name::regclass;
RETURN QUERY SELECT 'Index Count', v_index_count::TEXT;
RETURN QUERY SELECT 'Total Index Size', COALESCE(v_total_index_size, '0 bytes');
-- 表统计信息
RETURN QUERY
SELECT 'Last Vacuum', last_vacuum::TEXT
FROM pg_stat_user_tables
WHERE relname = p_table_name;
RETURN QUERY
SELECT 'Last Analyze', last_analyze::TEXT
FROM pg_stat_user_tables
WHERE relname = p_table_name;
END;
$$ LANGUAGE plpgsql;
最佳实践检查清单
自动化检查脚本
sql
-- 创建加载前检查清单
CREATE OR REPLACE FUNCTION pre_load_checklist(
p_target_table TEXT,
p_estimated_rows BIGINT
)
RETURNS TABLE(
check_item TEXT,
status TEXT,
recommendation TEXT
) AS $$
BEGIN
-- 检查表是否存在
RETURN QUERY
SELECT
'Target Table Exists',
CASE
WHEN EXISTS (SELECT 1 FROM pg_tables WHERE tablename = p_target_table)
THEN 'YES'
ELSE 'NO'
END,
'Create table before loading';
-- 检查可用磁盘空间
RETURN QUERY
SELECT
'Disk Space',
'CHECK MANUALLY',
format('Ensure at least %s GB free space',
round(p_estimated_rows * 100.0 / 1024 / 1024 / 1024 * 3, 2));
-- 检查 WAL 设置
RETURN QUERY
SELECT
'WAL Level',
current_setting('wal_level'),
CASE
WHEN current_setting('wal_level') = 'minimal'
THEN 'Optimal for bulk loading'
ELSE 'Consider setting to minimal for faster loads'
END;
-- 检查内存设置
RETURN QUERY
SELECT
'maintenance_work_mem',
current_setting('maintenance_work_mem'),
CASE
WHEN current_setting('maintenance_work_mem')::TEXT
SIMILAR TO '[0-9]+(MB|GB)'
AND replace(replace(current_setting('maintenance_work_mem'), 'MB', ''), 'GB', '')::INT >= 256
THEN 'Adequate'
ELSE 'Consider increasing to at least 256MB'
END;
-- 检查并发设置
RETURN QUERY
SELECT
'max_worker_processes',
current_setting('max_worker_processes'),
'Ensure sufficient for parallel operations';
END;
$$ LANGUAGE plpgsql;
完整的批量加载流程
端到端的加载脚本
sql
-- 主加载协调函数
CREATE OR REPLACE PROCEDURE bulk_load_coordinator(
p_source_file TEXT,
p_target_table TEXT,
p_options JSONB DEFAULT '{}'
)
LANGUAGE plpgsql
AS $$
DECLARE
v_start_time TIMESTAMP;
v_phase_start TIMESTAMP;
v_total_rows BIGINT;
v_use_copy BOOLEAN;
v_parallel_workers INT;
BEGIN
v_start_time := clock_timestamp();
v_use_copy := COALESCE((p_options->>'use_copy')::BOOLEAN, true);
v_parallel_workers := COALESCE((p_options->>'parallel_workers')::INT, 4);
RAISE NOTICE '[%] Starting bulk load process', clock_timestamp();
-- Phase 1: 准备
v_phase_start := clock_timestamp();
RAISE NOTICE '[%] Phase 1: Preparation', v_phase_start;
-- 优化设置
CALL apply_load_optimization();
-- 保存并删除索引
PERFORM save_and_drop_indexes(p_target_table);
-- 禁用触发器
EXECUTE format('ALTER TABLE %I DISABLE TRIGGER ALL', p_target_table);
RAISE NOTICE '[%] Preparation completed in %',
clock_timestamp(),
clock_timestamp() - v_phase_start;
-- Phase 2: 数据加载
v_phase_start := clock_timestamp();
RAISE NOTICE '[%] Phase 2: Data Loading', v_phase_start;
IF v_use_copy THEN
-- 使用 COPY
EXECUTE format('COPY %I FROM %L WITH (FORMAT CSV, HEADER)',
p_target_table, p_source_file);
ELSE
-- 使用其他方法
RAISE NOTICE 'Alternative loading method not implemented';
END IF;
-- 获取加载的行数
EXECUTE format('SELECT COUNT(*) FROM %I', p_target_table) INTO v_total_rows;
RAISE NOTICE '[%] Data loading completed in %, % rows loaded',
clock_timestamp(),
clock_timestamp() - v_phase_start,
v_total_rows;
-- Phase 3: 索引重建
v_phase_start := clock_timestamp();
RAISE NOTICE '[%] Phase 3: Index Rebuilding', v_phase_start;
-- 设置并行维护工作进程
EXECUTE format('SET max_parallel_maintenance_workers = %s', v_parallel_workers);
PERFORM restore_indexes(p_target_table);
RAISE NOTICE '[%] Index rebuilding completed in %',
clock_timestamp(),
clock_timestamp() - v_phase_start;
-- Phase 4: 完成
v_phase_start := clock_timestamp();
RAISE NOTICE '[%] Phase 4: Finalization', v_phase_start;
-- 重新启用触发器
EXECUTE format('ALTER TABLE %I ENABLE TRIGGER ALL', p_target_table);
-- 运行 ANALYZE
EXECUTE format('ANALYZE %I', p_target_table);
RAISE NOTICE '[%] Finalization completed in %',
clock_timestamp(),
clock_timestamp() - v_phase_start;
-- 总结
RAISE NOTICE '[%] Bulk load completed successfully', clock_timestamp();
RAISE NOTICE 'Total time: %', clock_timestamp() - v_start_time;
RAISE NOTICE 'Total rows: %', v_total_rows;
RAISE NOTICE 'Performance: % rows/second',
round(v_total_rows / EXTRACT(EPOCH FROM clock_timestamp() - v_start_time));
END;
$$;
总结
高效的数据库填充需要综合运用多种技术:
- 事务管理:批量提交减少开销
- COPY 命令:最快的数据加载方式
- 索引策略:先加载后建索引
- 约束管理:临时禁用外键检查
- 内存优化:调整工作内存参数
- WAL 优化:减少日志写入
- 并行处理:利用多核心加速
- 监控诊断:实时跟踪加载进度
关键原则:
- 理解每种优化的适用场景
- 权衡性能提升与数据安全
- 建立标准化的加载流程
- 持续监控和优化
记住
最快的加载方式是:COPY + 无索引 + 单事务 + minimal WAL