Skip to content

PostgreSQL 事务隔离级别深度解析

概述

事务隔离是数据库并发控制的核心机制,它决定了事务之间如何相互影响。PostgreSQL 实现了 SQL 标准定义的四种事务隔离级别,每个级别在数据一致性和并发性能之间提供了不同的平衡。

INFO

💡 核心概念

事务隔离级别定义了一个事务能够看到其他并发事务所做更改的程度。更高的隔离级别提供更强的一致性保证,但可能降低并发性能。

并发现象详解

在了解隔离级别之前,我们需要理解四种并发现象:

1. 脏读 (Dirty Read)

定义:一个事务读取了另一个未提交事务写入的数据。

业务场景示例

sql
-- 会话 A:转账操作
BEGIN;
UPDATE accounts SET balance = balance - 1000 WHERE id = 1;
-- 此时会话 B 读取到了未提交的数据
-- 如果会话 A 回滚,会话 B 读到的就是"脏数据"

2. 不可重复读 (Non-repeatable Read)

定义:一个事务重新读取之前读取过的数据,发现数据已被其他已提交事务修改。

业务场景示例

sql
-- 会话 A:查询库存
BEGIN;
SELECT quantity FROM inventory WHERE product_id = 100; -- 返回 50
-- 此时会话 B 更新了库存
-- 会话 A 再次查询
SELECT quantity FROM inventory WHERE product_id = 100; -- 可能返回 45
COMMIT;

3. 幻读 (Phantom Read)

定义:一个事务重新执行查询,返回满足条件的行集发生了变化。

业务场景示例

sql
-- 会话 A:统计订单
BEGIN;
SELECT COUNT(*) FROM orders WHERE status = 'pending'; -- 返回 10
-- 此时会话 B 插入了新的待处理订单
-- 会话 A 再次统计
SELECT COUNT(*) FROM orders WHERE status = 'pending'; -- 可能返回 11
COMMIT;

4. 序列化异常 (Serialization Anomaly)

定义:并发事务的执行结果与任何串行执行顺序都不一致。

隔离级别对比表

隔离级别脏读不可重复读幻读序列化异常PostgreSQL 实现
读取未提交允许但不适用可能可能可能行为同"读取已提交"
读取已提交不可能可能可能可能✅ 默认级别
可重复读取不可能不可能不适用可能✅ 快照隔离
可序列化不可能不可能不可能不可能✅ 可序列化快照隔离

WARNING

⚠️ PostgreSQL 特殊实现

PostgreSQL 只实现了三个不同的隔离级别。"读取未提交"模式实际上表现得像"读取已提交",这是适配 MVCC 架构的合理选择。

1. 读取已提交隔离级别

特性概述

读取已提交是 PostgreSQL 的默认隔离级别,提供基本的隔离保证:

  • SELECT 查询只能看到查询开始前已提交的数据
  • 永远看不到未提交的数据或查询执行期间的并发提交
  • 可以看到自身事务中的未提交更改

工作机制图解

实际应用示例

银行转账场景

sql
-- 事务 A:账户间转账
BEGIN;
UPDATE accounts SET balance = balance + 100.00 WHERE acctnum = 12345;
UPDATE accounts SET balance = balance - 100.00 WHERE acctnum = 7534;
COMMIT;
sql
-- 事务 B:同时执行的转账
BEGIN;
UPDATE accounts SET balance = balance + 50.00 WHERE acctnum = 12345;
UPDATE accounts SET balance = balance - 50.00 WHERE acctnum = 9999;
COMMIT;

执行分析

  1. 问题陈述:两个转账事务同时修改同一账户
  2. 解决方案:读取已提交级别确保每个 UPDATE 看到最新的已提交余额
  3. 分析过程:事务 B 在修改账户 12345 时会等待事务 A 提交,然后基于更新后的余额进行计算
  4. 预期结果:账户余额保持一致性,不会出现丢失更新

复杂查询的问题场景

sql
-- 问题示例:网站点击统计
BEGIN;
-- 初始状态:website 表有两行,hits 分别为 9 和 10
UPDATE website SET hits = hits + 1;
-- 另一个会话执行:
-- DELETE FROM website WHERE hits = 10;
COMMIT;

问题分析

  • DELETE 命令启动时,hits=10 的行已被更新为 hits=11
  • 因此 DELETE 不会删除任何行,尽管执行前后都存在 hits=10 的情况
  • 这种现象说明"读取已提交"对复杂查询可能产生意外结果

TIP

💡 使用建议

读取已提交级别适合:

  • 简单的 CRUD 操作
  • 可以容忍一定程度数据不一致的应用
  • 需要高并发性能的场景

不适合:

  • 复杂的分析查询
  • 需要事务内数据一致性的业务逻辑
  • 严格的财务或库存管理系统

UPDATE/DELETE 行为详解

sql
-- 示例:库存管理系统
CREATE TABLE inventory (
    product_id INT PRIMARY KEY,
    quantity INT,
    last_updated TIMESTAMP DEFAULT NOW()
);

-- 会话 A:减少库存
BEGIN;
UPDATE inventory
SET quantity = quantity - 5, last_updated = NOW()
WHERE product_id = 100 AND quantity >= 5;
-- 会话 B 可能同时执行类似操作
-- 会话 A 会等待或看到 B 提交后的结果
COMMIT;

2. 可重复读隔离级别

特性概述

可重复读级别提供更强的一致性保证:

  • 事务看到的是事务开始时的数据库快照
  • 单个事务内的所有查询看到相同的数据视图
  • 不会看到并发事务的提交结果

快照隔离机制

实际应用示例

报表生成场景

sql
-- 报表生成事务
BEGIN ISOLATION LEVEL REPEATABLE READ;

-- 第一次查询:获取总销售额
SELECT SUM(amount) FROM orders WHERE date = CURRENT_DATE;
-- 结果:$10,000

-- 业务逻辑处理...

-- 第二次查询:获取订单详情
SELECT product_id, SUM(amount) FROM orders
WHERE date = CURRENT_DATE
GROUP BY product_id;

-- 即使有新订单在并发提交,这两个查询看到的数据保持一致
COMMIT;

分析过程

  1. 一致性保证:整个事务期间看到相同的数据快照
  2. 业务价值:确保报表数据的内部一致性
  3. 性能考虑:避免因数据变化导致的重复计算

序列化冲突示例

sql
-- 库存检查和更新
BEGIN ISOLATION LEVEL REPEATABLE READ;

-- 检查当前库存
SELECT quantity FROM inventory WHERE product_id = 100;
-- 返回:50

-- 其他事务可能同时更新了这个产品的库存

-- 尝试更新库存
UPDATE inventory
SET quantity = quantity - 10
WHERE product_id = 100;

-- 如果其他事务已经修改了该行,会得到错误:
-- ERROR: could not serialize access due to concurrent update

COMMIT;

错误处理策略

python
import psycopg2
from psycopg2 import sql
import time
import random

def update_inventory_with_retry(product_id, quantity_change, max_retries=3):
    for attempt in range(max_retries):
        try:
            with connection.cursor() as cursor:
                cursor.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")

                # 检查库存
                cursor.execute(
                    "SELECT quantity FROM inventory WHERE product_id = %s",
                    (product_id,)
                )
                current_qty = cursor.fetchone()[0]

                if current_qty >= abs(quantity_change):
                    # 更新库存
                    cursor.execute(
                        "UPDATE inventory SET quantity = quantity + %s WHERE product_id = %s",
                        (quantity_change, product_id)
                    )
                    cursor.execute("COMMIT")
                    return True
                else:
                    cursor.execute("ROLLBACK")
                    return False

        except psycopg2.errors.SerializationFailure:
            cursor.execute("ROLLBACK")
            if attempt < max_retries - 1:
                # 随机延迟后重试
                time.sleep(random.uniform(0.1, 0.5))
                continue
            else:
                raise

    return False
java
public boolean updateInventoryWithRetry(
    Connection conn,
    int productId,
    int quantityChange,
    int maxRetries
) throws SQLException {

    for (int attempt = 0; attempt < maxRetries; attempt++) {
        try {
            conn.setAutoCommit(false);
            conn.setTransactionIsolation(
                Connection.TRANSACTION_REPEATABLE_READ
            );

            // 检查库存
            PreparedStatement checkStmt = conn.prepareStatement(
                "SELECT quantity FROM inventory WHERE product_id = ?"
            );
            checkStmt.setInt(1, productId);
            ResultSet rs = checkStmt.executeQuery();

            if (rs.next()) {
                int currentQty = rs.getInt("quantity");

                if (currentQty >= Math.abs(quantityChange)) {
                    // 更新库存
                    PreparedStatement updateStmt = conn.prepareStatement(
                        "UPDATE inventory SET quantity = quantity + ? WHERE product_id = ?"
                    );
                    updateStmt.setInt(1, quantityChange);
                    updateStmt.setInt(2, productId);
                    updateStmt.executeUpdate();

                    conn.commit();
                    return true;
                } else {
                    conn.rollback();
                    return false;
                }
            }

        } catch (SQLException e) {
            conn.rollback();
            if (e.getSQLState().equals("40001") && attempt < maxRetries - 1) {
                // 序列化失败,重试
                Thread.sleep((long)(Math.random() * 500 + 100));
                continue;
            } else {
                throw e;
            }
        }
    }
    return false;
}

WARNING

⚠️ 重要提醒

可重复读级别的应用必须准备处理序列化失败错误。推荐的做法是实现重试机制,在失败时重新执行整个事务。

与读取已提交的对比

方面读取已提交可重复读
快照创建时机每个语句开始时事务开始时
数据一致性语句级别事务级别
并发冲突处理等待并重新评估序列化失败
性能开销较低中等
适用场景简单操作复杂业务逻辑

3. 可序列化隔离级别

特性概述

可序列化隔离级别提供最严格的事务隔离:

  • 模拟串行事务执行
  • 防止所有并发异常
  • 使用谓词锁检测读写依赖

序列化异常检测

实际应用示例

会计系统的一致性保证

sql
-- 初始数据
CREATE TABLE accounts_summary (
    class INT,
    total_amount DECIMAL(12,2)
);

INSERT INTO accounts_summary VALUES
(1, 1000.00),
(2, 2000.00);

-- 事务 A:基于 class=1 计算,影响 class=2
BEGIN ISOLATION LEVEL SERIALIZABLE;

SELECT SUM(total_amount) FROM accounts_summary WHERE class = 1;
-- 结果:1000.00

-- 业务逻辑:将 class=1 的总额作为 class=2 的调整
UPDATE accounts_summary
SET total_amount = total_amount + 1000.00
WHERE class = 2;

COMMIT;
sql
-- 事务 B:同时执行的相似操作
BEGIN ISOLATION LEVEL SERIALIZABLE;

SELECT SUM(total_amount) FROM accounts_summary WHERE class = 2;
-- 结果:2000.00

-- 将 class=2 的总额作为 class=1 的调整
UPDATE accounts_summary
SET total_amount = total_amount + 2000.00
WHERE class = 1;

-- 这里会产生序列化冲突
-- ERROR: could not serialize access due to read/write dependencies among transactions

COMMIT;

分析过程

  1. 依赖检测:事务 A 读取 class=1,写入 class=2;事务 B 读取 class=2,写入 class=1
  2. 序列化检查:系统检测到循环依赖,无法找到等价的串行执行顺序
  3. 冲突解决:允许先到达提交点的事务成功,回滚后者

谓词锁的工作原理

sql
-- 查看谓词锁
SELECT
    locktype,
    mode,
    granted,
    relation::regclass as table_name
FROM pg_locks
WHERE mode = 'SIReadLock'
  AND pid = pg_backend_pid();

输出示例

locktype | mode      | granted | table_name
---------|-----------|---------|------------
relation | SIReadLock| t       | accounts_summary
tuple    | SIReadLock| t       | accounts_summary

性能优化策略

Details

🔧 性能调优参数

内存相关参数

sql
-- 增加谓词锁内存
SET max_pred_locks_per_transaction = 128;  -- 默认 64
SET max_pred_locks_per_relation = 1024;    -- 默认 -2
SET max_pred_locks_per_page = 8;           -- 默认 2

扫描方式优化

sql
-- 鼓励使用索引扫描,减少关系级别锁
SET random_page_cost = 1.0;     -- 降低随机访问成本
SET cpu_tuple_cost = 0.02;      -- 增加 CPU 处理成本

连接管理

sql
-- 防止长时间空闲事务
SET idle_in_transaction_session_timeout = '10min';

最佳实践指南

1. 事务设计原则

sql
-- ✅ 好的设计:简短、专注的事务
BEGIN ISOLATION LEVEL SERIALIZABLE;
-- 只做必要的操作
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 100;
INSERT INTO order_items (order_id, product_id, quantity) VALUES (1001, 100, 1);
COMMIT;

-- ❌ 不好的设计:长时间、复杂的事务
BEGIN ISOLATION LEVEL SERIALIZABLE;
-- 大量复杂查询和业务逻辑
-- 用户交互等待
-- 长时间持有锁
COMMIT;

2. 只读事务优化

sql
-- 只读事务声明
BEGIN ISOLATION LEVEL SERIALIZABLE READ ONLY;
-- 复杂的报表查询
SELECT ...;
COMMIT;

-- 可延迟只读事务(获得更好的快照)
BEGIN ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;
-- 等待获得无冲突快照后才开始
SELECT ...;
COMMIT;

3. 错误处理模式

python
import psycopg2
import logging
from enum import Enum

class TransactionResult(Enum):
    SUCCESS = "success"
    RETRY_NEEDED = "retry"
    BUSINESS_ERROR = "business_error"
    FATAL_ERROR = "fatal"

def execute_serializable_transaction(operation_func, max_retries=5):
    """
    执行可序列化事务的通用重试逻辑
    """
    for attempt in range(max_retries):
        try:
            with get_connection() as conn:
                with conn.cursor() as cursor:
                    cursor.execute("BEGIN ISOLATION LEVEL SERIALIZABLE")

                    result = operation_func(cursor)

                    if result == TransactionResult.SUCCESS:
                        cursor.execute("COMMIT")
                        return result
                    else:
                        cursor.execute("ROLLBACK")
                        return result

        except psycopg2.errors.SerializationFailure as e:
            logging.warning(f"Serialization failure on attempt {attempt + 1}: {e}")
            cursor.execute("ROLLBACK")

            if attempt < max_retries - 1:
                # 指数退避
                sleep_time = (2 ** attempt) * 0.1
                time.sleep(sleep_time)
                continue
            else:
                logging.error(f"Transaction failed after {max_retries} attempts")
                return TransactionResult.FATAL_ERROR

        except Exception as e:
            cursor.execute("ROLLBACK")
            logging.error(f"Unexpected error: {e}")
            return TransactionResult.FATAL_ERROR

    return TransactionResult.FATAL_ERROR

# 使用示例
def transfer_money(cursor, from_account, to_account, amount):
    """转账业务逻辑"""
    # 检查余额
    cursor.execute(
        "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
        (from_account,)
    )
    balance = cursor.fetchone()[0]

    if balance < amount:
        return TransactionResult.BUSINESS_ERROR

    # 执行转账
    cursor.execute(
        "UPDATE accounts SET balance = balance - %s WHERE id = %s",
        (amount, from_account)
    )
    cursor.execute(
        "UPDATE accounts SET balance = balance + %s WHERE id = %s",
        (amount, to_account)
    )

    return TransactionResult.SUCCESS

# 调用
result = execute_serializable_transaction(
    lambda cursor: transfer_money(cursor, 123, 456, 100.00)
)
javascript
const { Pool } = require("pg");
const pool = new Pool();

const TransactionResult = {
  SUCCESS: "success",
  RETRY_NEEDED: "retry",
  BUSINESS_ERROR: "business_error",
  FATAL_ERROR: "fatal",
};

async function executeSerializableTransaction(operationFunc, maxRetries = 5) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const client = await pool.connect();

    try {
      await client.query("BEGIN ISOLATION LEVEL SERIALIZABLE");

      const result = await operationFunc(client);

      if (result === TransactionResult.SUCCESS) {
        await client.query("COMMIT");
        return result;
      } else {
        await client.query("ROLLBACK");
        return result;
      }
    } catch (error) {
      await client.query("ROLLBACK");

      // 检查是否是序列化失败
      if (error.code === "40001") {
        console.warn(`Serialization failure on attempt ${attempt + 1}:`, error.message);

        if (attempt < maxRetries - 1) {
          // 指数退避
          const sleepTime = Math.pow(2, attempt) * 100;
          await new Promise((resolve) => setTimeout(resolve, sleepTime));
          continue;
        }
      }

      console.error("Transaction error:", error);
      return TransactionResult.FATAL_ERROR;
    } finally {
      client.release();
    }
  }

  return TransactionResult.FATAL_ERROR;
}

// 使用示例
async function transferMoney(client, fromAccount, toAccount, amount) {
  // 检查余额
  const balanceResult = await client.query("SELECT balance FROM accounts WHERE id = $1 FOR UPDATE", [fromAccount]);

  if (balanceResult.rows[0].balance < amount) {
    return TransactionResult.BUSINESS_ERROR;
  }

  // 执行转账
  await client.query("UPDATE accounts SET balance = balance - $1 WHERE id = $2", [amount, fromAccount]);

  await client.query("UPDATE accounts SET balance = balance + $1 WHERE id = $2", [amount, toAccount]);

  return TransactionResult.SUCCESS;
}

// 调用示例
(async () => {
  const result = await executeSerializableTransaction((client) => transferMoney(client, 123, 456, 100.0));

  console.log("Transaction result:", result);
})();

隔离级别选择指南

决策流程图

场景对比分析

应用场景推荐隔离级别理由注意事项
电商网站读取已提交高并发,可容忍轻微不一致关键操作使用更高级别
银行系统可序列化严格的一致性要求实现重试机制
数据分析可重复读需要分析期间数据一致避免长时间事务
内容管理读取已提交读多写少,一致性要求不高编辑冲突可通过应用层处理
库存管理可重复读/可序列化避免超卖等问题根据业务重要性选择

性能影响对比

sql
-- 创建测试表
CREATE TABLE test_performance (
    id SERIAL PRIMARY KEY,
    value INTEGER,
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 插入测试数据
INSERT INTO test_performance (value)
SELECT generate_series(1, 10000);

-- 读取已提交性能测试
\timing on
BEGIN; -- 默认读取已提交
SELECT COUNT(*), AVG(value) FROM test_performance;
UPDATE test_performance SET value = value + 1 WHERE id <= 100;
COMMIT;
\timing off
sql
\timing on
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT COUNT(*), AVG(value) FROM test_performance;
UPDATE test_performance SET value = value + 1 WHERE id <= 100;
COMMIT;
\timing off
sql
\timing on
BEGIN ISOLATION LEVEL SERIALIZABLE;
SELECT COUNT(*), AVG(value) FROM test_performance;
UPDATE test_performance SET value = value + 1 WHERE id <= 100;
COMMIT;
\timing off

性能测试结果示例

隔离级别平均执行时间内存使用锁开销并发度
读取已提交100ms最小最高
可重复读120ms中等中等
可序列化150ms最大中等

特殊情况和注意事项

序列 (Sequence) 的特殊行为

WARNING

⚠️ 序列的事务行为

PostgreSQL 中的序列操作有特殊的事务行为:

  • 序列更改立即对所有事务可见
  • 事务回滚不会恢复序列值
  • 这可能导致序列值的"空洞"
sql
-- 序列行为示例
CREATE SEQUENCE order_seq;

-- 事务 1
BEGIN ISOLATION LEVEL SERIALIZABLE;
SELECT nextval('order_seq'); -- 返回 1
ROLLBACK; -- 回滚,但序列值不会恢复

-- 事务 2
BEGIN;
SELECT nextval('order_seq'); -- 返回 2,而不是 1
COMMIT;

MVCC 架构的影响

PostgreSQL 的多版本并发控制 (MVCC) 影响隔离级别的实现:

死锁检测和处理

sql
-- 死锁场景模拟
-- 会话 A
BEGIN ISOLATION LEVEL SERIALIZABLE;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 等待...
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- 会话 B (同时执行)
BEGIN ISOLATION LEVEL SERIALIZABLE;
UPDATE accounts SET balance = balance - 50 WHERE id = 2;
-- 等待...
UPDATE accounts SET balance = balance + 50 WHERE id = 1;
-- DEADLOCK DETECTED

INFO

💡 死锁处理

PostgreSQL 会自动检测死锁并中止其中一个事务。应用程序应该:

  1. 捕获死锁错误 (SQLSTATE 40P01)
  2. 重试被中止的事务
  3. 考虑调整事务顺序以减少死锁

实践建议总结

开发最佳实践

  1. 默认选择读取已提交:适用于大多数 Web 应用
  2. 谨慎使用可序列化:仅在严格一致性要求时使用
  3. 实现重试机制:处理序列化失败和死锁
  4. 保持事务简短:减少锁持有时间
  5. 避免用户交互:不要在事务中等待用户输入

监控和调试

sql
-- 查看当前锁情况
SELECT
    pg_stat_activity.pid,
    pg_stat_activity.usename,
    pg_locks.mode,
    pg_locks.locktype,
    pg_locks.relation::regclass,
    pg_stat_activity.query
FROM pg_locks
JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
WHERE NOT pg_locks.granted
ORDER BY pg_stat_activity.query_start;
sql
-- 查看当前会话的隔离级别
SHOW transaction_isolation;

-- 查看所有活动事务的隔离级别
SELECT
    pid,
    usename,
    state,
    query_start,
    query
FROM pg_stat_activity
WHERE state = 'active'
  AND pid != pg_backend_pid();
sql
-- 查看序列化冲突统计
SELECT
    schemaname,
    tablename,
    n_conflict_tablespace,
    n_conflict_lock,
    n_conflict_snapshot,
    n_conflict_bufferpin,
    n_conflict_startup_deadlock
FROM pg_stat_database_conflicts;

配置参数调优

sql
-- 事务相关配置
SET default_transaction_isolation = 'read committed';
SET statement_timeout = '30s';
SET lock_timeout = '10s';
SET idle_in_transaction_session_timeout = '5min';

-- 序列化相关配置
SET max_pred_locks_per_transaction = 64;
SET max_pred_locks_per_relation = -2;
SET max_pred_locks_per_page = 2;

通过深入理解 PostgreSQL 的事务隔离机制,开发者能够在数据一致性和系统性能之间找到最佳平衡点,构建更加可靠和高效的数据库应用程序。选择合适的隔离级别不仅影响数据的正确性,也直接关系到系统的并发处理能力和用户体验。