Skip to content

PostgreSQL 数据库填充优化完全指南

概述

数据库的初始填充和大批量数据导入是 PostgreSQL 管理中的常见任务。无论是迁移数据、恢复备份还是批量导入业务数据,掌握高效的填充技术可以将原本需要数小时的操作缩短到几分钟。本文将详细介绍各种优化技术,帮助您实现最快速的数据加载。

数据加载性能影响因素

性能瓶颈分析

核心优化技术

1. 禁用自动提交

原理:每次自动提交都会触发 WAL 刷盘,造成大量 I/O 开销。

sql
-- 创建测试环境
CREATE TABLE load_test (
    id SERIAL PRIMARY KEY,
    data TEXT,
    value NUMERIC,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 性能对比函数
CREATE OR REPLACE FUNCTION compare_autocommit_performance(
    p_rows INT
)
RETURNS TABLE(
    method TEXT,
    duration INTERVAL,
    rows_per_second NUMERIC,
    relative_performance NUMERIC
) AS $$
DECLARE
    v_start TIMESTAMP;
    v_end TIMESTAMP;
    v_baseline NUMERIC;
BEGIN
    -- 测试1:自动提交模式(每行一个事务)
    TRUNCATE load_test;
    v_start := clock_timestamp();
    
    FOR i IN 1..p_rows LOOP
        INSERT INTO load_test (data, value) 
        VALUES ('Row ' || i, random() * 1000);
    END LOOP;
    
    v_end := clock_timestamp();
    v_baseline := p_rows / EXTRACT(EPOCH FROM (v_end - v_start));
    
    RETURN QUERY
    SELECT 
        'Autocommit ON',
        v_end - v_start,
        v_baseline,
        1.0;
    
    -- 测试2:单个事务
    TRUNCATE load_test;
    v_start := clock_timestamp();
    
    BEGIN;
    FOR i IN 1..p_rows LOOP
        INSERT INTO load_test (data, value) 
        VALUES ('Row ' || i, random() * 1000);
    END LOOP;
    COMMIT;
    
    v_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'Single Transaction',
        v_end - v_start,
        p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
        (p_rows / EXTRACT(EPOCH FROM (v_end - v_start))) / v_baseline;
    
    -- 测试3:批量事务(每1000行提交一次)
    TRUNCATE load_test;
    v_start := clock_timestamp();
    
    FOR batch IN 0..(p_rows/1000) LOOP
        BEGIN;
        FOR i IN 1..LEAST(1000, p_rows - batch * 1000) LOOP
            INSERT INTO load_test (data, value) 
            VALUES ('Row ' || (batch * 1000 + i), random() * 1000);
        END LOOP;
        COMMIT;
    END LOOP;
    
    v_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'Batch Transaction (1000)',
        v_end - v_start,
        p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
        (p_rows / EXTRACT(EPOCH FROM (v_end - v_start))) / v_baseline;
END;
$$ LANGUAGE plpgsql;

-- 执行测试
SELECT * FROM compare_autocommit_performance(10000);

2. 使用 COPY 命令

COPY 是 PostgreSQL 中最快的数据加载方法

sql
-- 创建 COPY 性能测试
CREATE OR REPLACE FUNCTION compare_insert_vs_copy(
    p_rows INT
)
RETURNS TABLE(
    method TEXT,
    duration INTERVAL,
    rows_per_second NUMERIC,
    mb_per_second NUMERIC
) AS $$
DECLARE
    v_start TIMESTAMP;
    v_end TIMESTAMP;
    v_size BIGINT;
    v_data_file TEXT := '/tmp/test_data.csv';
BEGIN
    -- 准备测试数据
    EXECUTE format('
        COPY (
            SELECT 
                i as id,
                ''Data '' || i as data,
                random() * 1000 as value,
                NOW() - (random() * 365)::INT * INTERVAL ''1 day'' as created_at
            FROM generate_series(1, %s) i
        ) TO %L WITH (FORMAT CSV, HEADER)',
        p_rows, v_data_file
    );
    
    -- 测试 INSERT
    CREATE TEMP TABLE insert_test (LIKE load_test INCLUDING ALL);
    v_start := clock_timestamp();
    
    INSERT INTO insert_test (data, value)
    SELECT 'Data ' || i, random() * 1000
    FROM generate_series(1, p_rows) i;
    
    v_end := clock_timestamp();
    v_size := pg_relation_size('insert_test');
    
    RETURN QUERY
    SELECT 
        'INSERT',
        v_end - v_start,
        p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
        v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
    
    -- 测试 COPY
    CREATE TEMP TABLE copy_test (LIKE load_test INCLUDING ALL);
    v_start := clock_timestamp();
    
    EXECUTE format('COPY copy_test FROM %L WITH (FORMAT CSV, HEADER)', v_data_file);
    
    v_end := clock_timestamp();
    v_size := pg_relation_size('copy_test');
    
    RETURN QUERY
    SELECT 
        'COPY',
        v_end - v_start,
        p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
        v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
    
    -- 测试 COPY FREEZE(跳过可见性信息)
    CREATE TEMP TABLE copy_freeze_test (LIKE load_test INCLUDING ALL);
    v_start := clock_timestamp();
    
    BEGIN;
    TRUNCATE copy_freeze_test;
    EXECUTE format('COPY copy_freeze_test FROM %L WITH (FORMAT CSV, HEADER, FREEZE)', v_data_file);
    COMMIT;
    
    v_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'COPY FREEZE',
        v_end - v_start,
        p_rows / EXTRACT(EPOCH FROM (v_end - v_start)),
        v_size / 1024.0 / 1024.0 / EXTRACT(EPOCH FROM (v_end - v_start));
END;
$$ LANGUAGE plpgsql;

3. 索引管理策略

sql
-- 创建索引管理辅助函数
CREATE OR REPLACE FUNCTION save_and_drop_indexes(
    p_table_name TEXT
)
RETURNS TABLE(
    index_name TEXT,
    index_definition TEXT
) AS $$
BEGIN
    -- 保存索引定义
    CREATE TEMP TABLE IF NOT EXISTS saved_indexes (
        table_name TEXT,
        index_name TEXT,
        index_definition TEXT,
        saved_at TIMESTAMP DEFAULT NOW()
    );
    
    -- 保存并删除索引
    INSERT INTO saved_indexes (table_name, index_name, index_definition)
    SELECT 
        p_table_name,
        indexname,
        indexdef
    FROM pg_indexes
    WHERE tablename = p_table_name
      AND indexname NOT LIKE '%_pkey'  -- 保留主键
    RETURNING index_name, index_definition;
    
    -- 删除索引
    FOR index_name IN 
        SELECT indexname 
        FROM pg_indexes 
        WHERE tablename = p_table_name
          AND indexname NOT LIKE '%_pkey'
    LOOP
        EXECUTE format('DROP INDEX %I', index_name);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 重建索引函数
CREATE OR REPLACE FUNCTION restore_indexes(
    p_table_name TEXT
)
RETURNS VOID AS $$
DECLARE
    v_index RECORD;
BEGIN
    -- 并行重建索引
    SET max_parallel_maintenance_workers = 4;
    SET maintenance_work_mem = '1GB';
    
    FOR v_index IN 
        SELECT index_definition 
        FROM saved_indexes 
        WHERE table_name = p_table_name
        ORDER BY saved_at DESC
    LOOP
        EXECUTE v_index.index_definition;
    END LOOP;
    
    -- 清理保存的索引信息
    DELETE FROM saved_indexes WHERE table_name = p_table_name;
    
    RESET max_parallel_maintenance_workers;
    RESET maintenance_work_mem;
END;
$$ LANGUAGE plpgsql;

-- 测试索引对加载性能的影响
CREATE OR REPLACE FUNCTION test_index_impact(
    p_rows INT
)
RETURNS TABLE(
    scenario TEXT,
    load_time INTERVAL,
    index_time INTERVAL,
    total_time INTERVAL
) AS $$
DECLARE
    v_start TIMESTAMP;
    v_load_end TIMESTAMP;
    v_index_end TIMESTAMP;
BEGIN
    -- 场景1:先加载后建索引
    CREATE TEMP TABLE test_table1 (
        id SERIAL PRIMARY KEY,
        col1 TEXT,
        col2 INT,
        col3 TIMESTAMP,
        col4 NUMERIC
    );
    
    v_start := clock_timestamp();
    
    INSERT INTO test_table1 (col1, col2, col3, col4)
    SELECT 
        'Text ' || i,
        i % 1000,
        NOW() - (i % 365) * INTERVAL '1 day',
        random() * 1000
    FROM generate_series(1, p_rows) i;
    
    v_load_end := clock_timestamp();
    
    CREATE INDEX idx1_1 ON test_table1(col1);
    CREATE INDEX idx1_2 ON test_table1(col2);
    CREATE INDEX idx1_3 ON test_table1(col3);
    CREATE INDEX idx1_4 ON test_table1(col1, col2);
    
    v_index_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'Load then Index',
        v_load_end - v_start,
        v_index_end - v_load_end,
        v_index_end - v_start;
    
    -- 场景2:带索引加载
    CREATE TEMP TABLE test_table2 (
        id SERIAL PRIMARY KEY,
        col1 TEXT,
        col2 INT,
        col3 TIMESTAMP,
        col4 NUMERIC
    );
    
    CREATE INDEX idx2_1 ON test_table2(col1);
    CREATE INDEX idx2_2 ON test_table2(col2);
    CREATE INDEX idx2_3 ON test_table2(col3);
    CREATE INDEX idx2_4 ON test_table2(col1, col2);
    
    v_start := clock_timestamp();
    
    INSERT INTO test_table2 (col1, col2, col3, col4)
    SELECT 
        'Text ' || i,
        i % 1000,
        NOW() - (i % 365) * INTERVAL '1 day',
        random() * 1000
    FROM generate_series(1, p_rows) i;
    
    v_load_end := clock_timestamp();
    
    RETURN QUERY
    SELECT 
        'Load with Indexes',
        v_load_end - v_start,
        INTERVAL '0',
        v_load_end - v_start;
END;
$$ LANGUAGE plpgsql;

4. 外键约束管理

sql
-- 外键约束管理工具
CREATE OR REPLACE FUNCTION manage_foreign_keys(
    p_action TEXT,  -- 'disable' 或 'enable'
    p_table_name TEXT DEFAULT NULL
)
RETURNS TABLE(
    constraint_name TEXT,
    table_name TEXT,
    status TEXT
) AS $$
DECLARE
    v_constraint RECORD;
BEGIN
    IF p_action = 'disable' THEN
        FOR v_constraint IN
            SELECT 
                con.conname,
                con.conrelid::regclass::TEXT as table_name,
                pg_get_constraintdef(con.oid) as definition
            FROM pg_constraint con
            WHERE con.contype = 'f'
              AND (p_table_name IS NULL OR con.conrelid::regclass::TEXT = p_table_name)
        LOOP
            EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I',
                          v_constraint.table_name, v_constraint.conname);
            
            -- 保存约束定义
            INSERT INTO saved_constraints (constraint_name, table_name, definition)
            VALUES (v_constraint.conname, v_constraint.table_name, v_constraint.definition);
            
            RETURN QUERY
            SELECT v_constraint.conname, v_constraint.table_name, 'DROPPED';
        END LOOP;
        
    ELSIF p_action = 'enable' THEN
        FOR v_constraint IN
            SELECT * FROM saved_constraints
            WHERE p_table_name IS NULL OR table_name = p_table_name
        LOOP
            EXECUTE format('ALTER TABLE %s ADD CONSTRAINT %I %s',
                          v_constraint.table_name, 
                          v_constraint.constraint_name,
                          v_constraint.definition);
            
            RETURN QUERY
            SELECT v_constraint.constraint_name, v_constraint.table_name, 'CREATED';
        END LOOP;
        
        -- 清理保存的约束
        DELETE FROM saved_constraints 
        WHERE p_table_name IS NULL OR table_name = p_table_name;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- 创建保存约束的表
CREATE TABLE IF NOT EXISTS saved_constraints (
    constraint_name TEXT,
    table_name TEXT,
    definition TEXT,
    saved_at TIMESTAMP DEFAULT NOW()
);

5. 内存参数优化

sql
-- 动态内存配置优化
CREATE OR REPLACE FUNCTION optimize_load_settings(
    p_data_size_gb NUMERIC
)
RETURNS TABLE(
    parameter TEXT,
    original_value TEXT,
    optimized_value TEXT,
    recommendation TEXT
) AS $$
DECLARE
    v_total_ram_gb NUMERIC;
BEGIN
    -- 假设系统内存(实际应从系统获取)
    v_total_ram_gb := 64;
    
    -- 保存原始设置
    CREATE TEMP TABLE IF NOT EXISTS original_settings AS
    SELECT name, setting, unit
    FROM pg_settings
    WHERE name IN (
        'maintenance_work_mem',
        'work_mem',
        'shared_buffers',
        'effective_cache_size',
        'max_wal_size',
        'checkpoint_timeout'
    );
    
    -- maintenance_work_mem
    RETURN QUERY
    SELECT 
        'maintenance_work_mem',
        current_setting('maintenance_work_mem'),
        CASE 
            WHEN v_data_size_gb < 1 THEN '256MB'
            WHEN v_data_size_gb < 10 THEN '1GB'
            WHEN v_data_size_gb < 100 THEN '2GB'
            ELSE '4GB'
        END,
        'Increase for faster index creation';
    
    -- work_mem
    RETURN QUERY
    SELECT 
        'work_mem',
        current_setting('work_mem'),
        CASE 
            WHEN v_data_size_gb < 10 THEN '128MB'
            WHEN v_data_size_gb < 100 THEN '256MB'
            ELSE '512MB'
        END,
        'Increase for complex queries during load';
    
    -- max_wal_size
    RETURN QUERY
    SELECT 
        'max_wal_size',
        current_setting('max_wal_size'),
        CASE 
            WHEN v_data_size_gb < 10 THEN '4GB'
            WHEN v_data_size_gb < 100 THEN '16GB'
            ELSE '32GB'
        END,
        'Reduce checkpoint frequency during load';
    
    -- checkpoint_timeout
    RETURN QUERY
    SELECT 
        'checkpoint_timeout',
        current_setting('checkpoint_timeout'),
        '30min',
        'Increase checkpoint interval';
END;
$$ LANGUAGE plpgsql;

-- 应用优化设置
CREATE OR REPLACE PROCEDURE apply_load_optimization()
LANGUAGE plpgsql
AS $$
BEGIN
    -- 应用批量加载优化设置
    SET maintenance_work_mem = '2GB';
    SET work_mem = '256MB';
    SET max_wal_size = '16GB';
    SET checkpoint_timeout = '30min';
    SET synchronous_commit = 'off';
    SET wal_buffers = '64MB';
    SET checkpoint_completion_target = 0.9;
    
    RAISE NOTICE 'Bulk load optimization settings applied';
END;
$$;

6. WAL 和复制管理

sql
-- WAL 设置管理
CREATE OR REPLACE FUNCTION manage_wal_settings(
    p_mode TEXT  -- 'minimal', 'replica', 'logical'
)
RETURNS TABLE(
    setting TEXT,
    value TEXT,
    requires_restart BOOLEAN
) AS $$
BEGIN
    IF p_mode = 'minimal' THEN
        -- 最小 WAL 模式(最快但无复制)
        RETURN QUERY VALUES
            ('wal_level', 'minimal', true),
            ('max_wal_senders', '0', true),
            ('archive_mode', 'off', true),
            ('synchronous_commit', 'off', false),
            ('full_page_writes', 'off', false);
            
    ELSIF p_mode = 'replica' THEN
        -- 物理复制模式
        RETURN QUERY VALUES
            ('wal_level', 'replica', true),
            ('max_wal_senders', '3', true),
            ('archive_mode', 'on', true),
            ('synchronous_commit', 'on', false),
            ('full_page_writes', 'on', false);
            
    ELSIF p_mode = 'logical' THEN
        -- 逻辑复制模式
        RETURN QUERY VALUES
            ('wal_level', 'logical', true),
            ('max_wal_senders', '10', true),
            ('archive_mode', 'on', true),
            ('synchronous_commit', 'on', false),
            ('full_page_writes', 'on', false);
    END IF;
    
    RAISE NOTICE 'Settings for % mode displayed. Restart required for settings marked true.', p_mode;
END;
$$ LANGUAGE plpgsql;

实际应用案例

案例1:大规模数据迁移

sql
-- 创建数据迁移框架
CREATE OR REPLACE PROCEDURE migrate_large_table(
    p_source_table TEXT,
    p_target_table TEXT,
    p_batch_size INT DEFAULT 100000
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_start_time TIMESTAMP;
    v_end_time TIMESTAMP;
    v_total_rows BIGINT;
    v_migrated_rows BIGINT := 0;
    v_batch_start BIGINT := 0;
BEGIN
    v_start_time := clock_timestamp();
    
    -- 获取源表行数
    EXECUTE format('SELECT COUNT(*) FROM %I', p_source_table) INTO v_total_rows;
    RAISE NOTICE 'Starting migration of % rows from % to %', v_total_rows, p_source_table, p_target_table;
    
    -- 创建目标表(如果不存在)
    EXECUTE format('CREATE TABLE IF NOT EXISTS %I (LIKE %I INCLUDING ALL)', p_target_table, p_source_table);
    
    -- 删除目标表索引
    PERFORM save_and_drop_indexes(p_target_table);
    
    -- 禁用目标表触发器
    EXECUTE format('ALTER TABLE %I DISABLE TRIGGER ALL', p_target_table);
    
    -- 优化设置
    SET LOCAL maintenance_work_mem = '2GB';
    SET LOCAL work_mem = '512MB';
    SET LOCAL synchronous_commit = 'off';
    
    -- 批量迁移数据
    LOOP
        -- 使用 COPY 进行批量传输
        EXECUTE format('
            INSERT INTO %I 
            SELECT * FROM %I 
            ORDER BY ctid
            LIMIT %s OFFSET %s',
            p_target_table, p_source_table, p_batch_size, v_batch_start
        );
        
        GET DIAGNOSTICS v_migrated_rows = ROW_COUNT;
        v_batch_start := v_batch_start + v_migrated_rows;
        
        -- 进度报告
        IF v_batch_start % (p_batch_size * 10) = 0 THEN
            RAISE NOTICE 'Progress: % / % rows (%%)', 
                v_batch_start, v_total_rows, 
                round(v_batch_start::NUMERIC / v_total_rows * 100, 2);
        END IF;
        
        EXIT WHEN v_migrated_rows < p_batch_size;
    END LOOP;
    
    -- 重建索引
    RAISE NOTICE 'Rebuilding indexes...';
    PERFORM restore_indexes(p_target_table);
    
    -- 重新启用触发器
    EXECUTE format('ALTER TABLE %I ENABLE TRIGGER ALL', p_target_table);
    
    -- 更新统计信息
    EXECUTE format('ANALYZE %I', p_target_table);
    
    v_end_time := clock_timestamp();
    RAISE NOTICE 'Migration completed in %, % rows migrated', 
        v_end_time - v_start_time, v_batch_start;
END;
$$;

案例2:实时数据同步加载

sql
-- 创建增量数据加载系统
CREATE TABLE data_load_jobs (
    job_id SERIAL PRIMARY KEY,
    job_name TEXT NOT NULL,
    source_file TEXT,
    target_table TEXT,
    status TEXT DEFAULT 'pending',
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    rows_loaded BIGINT,
    error_message TEXT
);

CREATE OR REPLACE FUNCTION process_data_load_job(
    p_job_id INT
)
RETURNS VOID AS $$
DECLARE
    v_job RECORD;
    v_temp_table TEXT;
    v_row_count BIGINT;
BEGIN
    -- 获取作业信息
    SELECT * INTO v_job FROM data_load_jobs WHERE job_id = p_job_id;
    
    IF NOT FOUND THEN
        RAISE EXCEPTION 'Job % not found', p_job_id;
    END IF;
    
    -- 更新作业状态
    UPDATE data_load_jobs 
    SET status = 'running', started_at = NOW() 
    WHERE job_id = p_job_id;
    
    BEGIN
        -- 创建临时表
        v_temp_table := 'temp_load_' || p_job_id;
        EXECUTE format('CREATE TEMP TABLE %I (LIKE %I)', v_temp_table, v_job.target_table);
        
        -- 加载数据到临时表
        EXECUTE format('COPY %I FROM %L WITH (FORMAT CSV, HEADER)', 
                      v_temp_table, v_job.source_file);
        
        -- 获取加载的行数
        EXECUTE format('SELECT COUNT(*) FROM %I', v_temp_table) INTO v_row_count;
        
        -- 数据验证
        PERFORM validate_loaded_data(v_temp_table);
        
        -- 合并到目标表
        EXECUTE format('
            INSERT INTO %I 
            SELECT * FROM %I
            ON CONFLICT DO NOTHING',
            v_job.target_table, v_temp_table
        );
        
        -- 更新作业状态
        UPDATE data_load_jobs 
        SET status = 'completed', 
            completed_at = NOW(),
            rows_loaded = v_row_count
        WHERE job_id = p_job_id;
        
    EXCEPTION WHEN OTHERS THEN
        -- 错误处理
        UPDATE data_load_jobs 
        SET status = 'failed',
            error_message = SQLERRM
        WHERE job_id = p_job_id;
        
        RAISE;
    END;
END;
$$ LANGUAGE plpgsql;

-- 数据验证函数
CREATE OR REPLACE FUNCTION validate_loaded_data(
    p_table_name TEXT
)
RETURNS VOID AS $$
DECLARE
    v_null_count BIGINT;
    v_duplicate_count BIGINT;
BEGIN
    -- 检查必填字段
    EXECUTE format('
        SELECT COUNT(*) 
        FROM %I 
        WHERE id IS NULL OR created_at IS NULL',
        p_table_name
    ) INTO v_null_count;
    
    IF v_null_count > 0 THEN
        RAISE EXCEPTION 'Found % rows with NULL in required fields', v_null_count;
    END IF;
    
    -- 检查重复
    EXECUTE format('
        SELECT COUNT(*) 
        FROM (
            SELECT id, COUNT(*) 
            FROM %I 
            GROUP BY id 
            HAVING COUNT(*) > 1
        ) dup',
        p_table_name
    ) INTO v_duplicate_count;
    
    IF v_duplicate_count > 0 THEN
        RAISE WARNING 'Found % duplicate IDs', v_duplicate_count;
    END IF;
END;
$$ LANGUAGE plpgsql;

案例3:自动化的 pg_dump 恢复优化

sql
-- pg_dump 恢复优化脚本
CREATE OR REPLACE PROCEDURE optimize_pg_restore(
    p_dump_file TEXT,
    p_jobs INT DEFAULT 4
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_start_time TIMESTAMP;
    v_end_time TIMESTAMP;
BEGIN
    v_start_time := clock_timestamp();
    
    -- 1. 设置优化参数
    ALTER SYSTEM SET maintenance_work_mem = '2GB';
    ALTER SYSTEM SET max_wal_size = '32GB';
    ALTER SYSTEM SET checkpoint_timeout = '1h';
    ALTER SYSTEM SET checkpoint_completion_target = 0.9;
    ALTER SYSTEM SET wal_level = 'minimal';
    ALTER SYSTEM SET archive_mode = 'off';
    ALTER SYSTEM SET max_wal_senders = 0;
    
    -- 重载配置
    SELECT pg_reload_conf();
    
    -- 2. 执行恢复
    RAISE NOTICE 'Starting restore with % parallel jobs...', p_jobs;
    
    -- 注意:实际的 pg_restore 命令需要在 shell 中执行
    -- 这里只是示例
    RAISE NOTICE 'Execute: pg_restore -j % -d dbname %', p_jobs, p_dump_file;
    
    -- 3. 恢复完成后的处理
    -- 运行 ANALYZE
    RAISE NOTICE 'Running ANALYZE on all tables...';
    PERFORM analyze_all_tables();
    
    -- 4. 恢复生产设置
    ALTER SYSTEM RESET maintenance_work_mem;
    ALTER SYSTEM RESET max_wal_size;
    ALTER SYSTEM RESET checkpoint_timeout;
    ALTER SYSTEM RESET checkpoint_completion_target;
    ALTER SYSTEM RESET wal_level;
    ALTER SYSTEM RESET archive_mode;
    ALTER SYSTEM RESET max_wal_senders;
    
    SELECT pg_reload_conf();
    
    v_end_time := clock_timestamp();
    RAISE NOTICE 'Restore optimization completed in %', v_end_time - v_start_time;
END;
$$;

-- 分析所有表
CREATE OR REPLACE FUNCTION analyze_all_tables()
RETURNS VOID AS $$
DECLARE
    v_table RECORD;
    v_count INT := 0;
BEGIN
    FOR v_table IN
        SELECT schemaname, tablename
        FROM pg_tables
        WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
        ORDER BY pg_relation_size(schemaname||'.'||tablename) DESC
    LOOP
        EXECUTE format('ANALYZE %I.%I', v_table.schemaname, v_table.tablename);
        v_count := v_count + 1;
        
        IF v_count % 10 = 0 THEN
            RAISE NOTICE 'Analyzed % tables...', v_count;
        END IF;
    END LOOP;
    
    RAISE NOTICE 'Total tables analyzed: %', v_count;
END;
$$ LANGUAGE plpgsql;

性能监控和诊断

实时加载监控

sql
-- 创建加载性能监控视图
CREATE OR REPLACE VIEW data_load_monitor AS
WITH current_activity AS (
    SELECT 
        pid,
        usename,
        application_name,
        state,
        query,
        query_start,
        NOW() - query_start as duration
    FROM pg_stat_activity
    WHERE state = 'active'
      AND (query ILIKE '%COPY%' OR query ILIKE '%INSERT%')
),
io_stats AS (
    SELECT 
        SUM(blks_read) as blocks_read,
        SUM(blks_hit) as blocks_hit,
        CASE 
            WHEN SUM(blks_read + blks_hit) > 0 
            THEN SUM(blks_hit)::NUMERIC / SUM(blks_read + blks_hit) * 100
            ELSE 0 
        END as cache_hit_ratio
    FROM pg_stat_database
    WHERE datname = current_database()
)
SELECT 
    ca.*,
    io.blocks_read,
    io.blocks_hit,
    round(io.cache_hit_ratio, 2) as cache_hit_pct,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')) as wal_written
FROM current_activity ca
CROSS JOIN io_stats io;

-- 加载进度跟踪函数
CREATE OR REPLACE FUNCTION track_load_progress(
    p_table_name TEXT
)
RETURNS TABLE(
    metric TEXT,
    value TEXT
) AS $$
DECLARE
    v_row_count BIGINT;
    v_table_size TEXT;
    v_index_count INT;
    v_total_index_size TEXT;
BEGIN
    -- 行数
    EXECUTE format('SELECT COUNT(*) FROM %I', p_table_name) INTO v_row_count;
    RETURN QUERY SELECT 'Row Count', v_row_count::TEXT;
    
    -- 表大小
    SELECT pg_size_pretty(pg_relation_size(p_table_name)) INTO v_table_size;
    RETURN QUERY SELECT 'Table Size', v_table_size;
    
    -- 索引信息
    SELECT COUNT(*), pg_size_pretty(SUM(pg_relation_size(indexrelid)))
    INTO v_index_count, v_total_index_size
    FROM pg_index
    WHERE indrelid = p_table_name::regclass;
    
    RETURN QUERY SELECT 'Index Count', v_index_count::TEXT;
    RETURN QUERY SELECT 'Total Index Size', COALESCE(v_total_index_size, '0 bytes');
    
    -- 表统计信息
    RETURN QUERY
    SELECT 'Last Vacuum', last_vacuum::TEXT
    FROM pg_stat_user_tables
    WHERE relname = p_table_name;
    
    RETURN QUERY
    SELECT 'Last Analyze', last_analyze::TEXT
    FROM pg_stat_user_tables
    WHERE relname = p_table_name;
END;
$$ LANGUAGE plpgsql;

最佳实践检查清单

自动化检查脚本

sql
-- 创建加载前检查清单
CREATE OR REPLACE FUNCTION pre_load_checklist(
    p_target_table TEXT,
    p_estimated_rows BIGINT
)
RETURNS TABLE(
    check_item TEXT,
    status TEXT,
    recommendation TEXT
) AS $$
BEGIN
    -- 检查表是否存在
    RETURN QUERY
    SELECT 
        'Target Table Exists',
        CASE 
            WHEN EXISTS (SELECT 1 FROM pg_tables WHERE tablename = p_target_table)
            THEN 'YES'
            ELSE 'NO'
        END,
        'Create table before loading';
    
    -- 检查可用磁盘空间
    RETURN QUERY
    SELECT 
        'Disk Space',
        'CHECK MANUALLY',
        format('Ensure at least %s GB free space', 
               round(p_estimated_rows * 100.0 / 1024 / 1024 / 1024 * 3, 2));
    
    -- 检查 WAL 设置
    RETURN QUERY
    SELECT 
        'WAL Level',
        current_setting('wal_level'),
        CASE 
            WHEN current_setting('wal_level') = 'minimal'
            THEN 'Optimal for bulk loading'
            ELSE 'Consider setting to minimal for faster loads'
        END;
    
    -- 检查内存设置
    RETURN QUERY
    SELECT 
        'maintenance_work_mem',
        current_setting('maintenance_work_mem'),
        CASE 
            WHEN current_setting('maintenance_work_mem')::TEXT 
                 SIMILAR TO '[0-9]+(MB|GB)' 
                 AND replace(replace(current_setting('maintenance_work_mem'), 'MB', ''), 'GB', '')::INT >= 256
            THEN 'Adequate'
            ELSE 'Consider increasing to at least 256MB'
        END;
    
    -- 检查并发设置
    RETURN QUERY
    SELECT 
        'max_worker_processes',
        current_setting('max_worker_processes'),
        'Ensure sufficient for parallel operations';
END;
$$ LANGUAGE plpgsql;

完整的批量加载流程

端到端的加载脚本

sql
-- 主加载协调函数
CREATE OR REPLACE PROCEDURE bulk_load_coordinator(
    p_source_file TEXT,
    p_target_table TEXT,
    p_options JSONB DEFAULT '{}'
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_start_time TIMESTAMP;
    v_phase_start TIMESTAMP;
    v_total_rows BIGINT;
    v_use_copy BOOLEAN;
    v_parallel_workers INT;
BEGIN
    v_start_time := clock_timestamp();
    v_use_copy := COALESCE((p_options->>'use_copy')::BOOLEAN, true);
    v_parallel_workers := COALESCE((p_options->>'parallel_workers')::INT, 4);
    
    RAISE NOTICE '[%] Starting bulk load process', clock_timestamp();
    
    -- Phase 1: 准备
    v_phase_start := clock_timestamp();
    RAISE NOTICE '[%] Phase 1: Preparation', v_phase_start;
    
    -- 优化设置
    CALL apply_load_optimization();
    
    -- 保存并删除索引
    PERFORM save_and_drop_indexes(p_target_table);
    
    -- 禁用触发器
    EXECUTE format('ALTER TABLE %I DISABLE TRIGGER ALL', p_target_table);
    
    RAISE NOTICE '[%] Preparation completed in %', 
        clock_timestamp(), 
        clock_timestamp() - v_phase_start;
    
    -- Phase 2: 数据加载
    v_phase_start := clock_timestamp();
    RAISE NOTICE '[%] Phase 2: Data Loading', v_phase_start;
    
    IF v_use_copy THEN
        -- 使用 COPY
        EXECUTE format('COPY %I FROM %L WITH (FORMAT CSV, HEADER)', 
                      p_target_table, p_source_file);
    ELSE
        -- 使用其他方法
        RAISE NOTICE 'Alternative loading method not implemented';
    END IF;
    
    -- 获取加载的行数
    EXECUTE format('SELECT COUNT(*) FROM %I', p_target_table) INTO v_total_rows;
    
    RAISE NOTICE '[%] Data loading completed in %, % rows loaded', 
        clock_timestamp(), 
        clock_timestamp() - v_phase_start,
        v_total_rows;
    
    -- Phase 3: 索引重建
    v_phase_start := clock_timestamp();
    RAISE NOTICE '[%] Phase 3: Index Rebuilding', v_phase_start;
    
    -- 设置并行维护工作进程
    EXECUTE format('SET max_parallel_maintenance_workers = %s', v_parallel_workers);
    
    PERFORM restore_indexes(p_target_table);
    
    RAISE NOTICE '[%] Index rebuilding completed in %', 
        clock_timestamp(), 
        clock_timestamp() - v_phase_start;
    
    -- Phase 4: 完成
    v_phase_start := clock_timestamp();
    RAISE NOTICE '[%] Phase 4: Finalization', v_phase_start;
    
    -- 重新启用触发器
    EXECUTE format('ALTER TABLE %I ENABLE TRIGGER ALL', p_target_table);
    
    -- 运行 ANALYZE
    EXECUTE format('ANALYZE %I', p_target_table);
    
    RAISE NOTICE '[%] Finalization completed in %', 
        clock_timestamp(), 
        clock_timestamp() - v_phase_start;
    
    -- 总结
    RAISE NOTICE '[%] Bulk load completed successfully', clock_timestamp();
    RAISE NOTICE 'Total time: %', clock_timestamp() - v_start_time;
    RAISE NOTICE 'Total rows: %', v_total_rows;
    RAISE NOTICE 'Performance: % rows/second', 
        round(v_total_rows / EXTRACT(EPOCH FROM clock_timestamp() - v_start_time));
END;
$$;

总结

高效的数据库填充需要综合运用多种技术:

  1. 事务管理:批量提交减少开销
  2. COPY 命令:最快的数据加载方式
  3. 索引策略:先加载后建索引
  4. 约束管理:临时禁用外键检查
  5. 内存优化:调整工作内存参数
  6. WAL 优化:减少日志写入
  7. 并行处理:利用多核心加速
  8. 监控诊断:实时跟踪加载进度

关键原则:

  • 理解每种优化的适用场景
  • 权衡性能提升与数据安全
  • 建立标准化的加载流程
  • 持续监控和优化

记住

最快的加载方式是:COPY + 无索引 + 单事务 + minimal WAL

扩展阅读