Skip to content

PostgreSQL MVCC 序列化失败处理

概述

在 PostgreSQL 的多版本并发控制(MVCC)机制中,可重复读(REPEATABLE READ)和可序列化(SERIALIZABLE)隔离级别可能会产生序列化错误,这是为了防止序列化异常而设计的机制。理解和正确处理这些错误对于开发高并发应用程序至关重要。

核心概念

序列化失败的本质

序列化失败是数据库在检测到可能破坏事务一致性的并发操作时抛出的错误。这些错误确保了数据库的 ACID 特性,特别是隔离性(Isolation)。

错误类型和处理策略

1. 序列化失败错误

这是最常见的并发冲突错误类型。

错误特征

  • SQLSTATE 代码: 40001 (serialization_failure)
  • 重试策略: 建议无条件重试
  • 发生场景: 可重复读和可序列化隔离级别

实际示例

场景: 两个并发事务同时更新银行账户余额

sql
-- 会话 A
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1; -- 返回 1000
-- ... 一些业务逻辑处理时间 ...
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
sql
-- 会话 B (并发执行)
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1; -- 返回 1000
-- ... 一些业务逻辑处理时间 ...
UPDATE accounts SET balance = balance - 200 WHERE id = 1;

结果分析:

  • 其中一个事务会成功提交
  • 另一个事务会收到 serialization_failure 错误
  • 失败的事务需要完整重试

WARNING

重要提醒序列化失败错误应该无条件重试,因为这是正常的并发控制机制。

2. 死锁检测错误

死锁是两个或多个事务相互等待对方释放资源的情况。

错误特征

  • SQLSTATE 代码: 40P01 (deadlock_detected)
  • 重试策略: 建议重试
  • 发生场景: 任何隔离级别

实际示例

死锁场景:

sql
-- 会话 A
BEGIN;
UPDATE table1 SET value = 'A' WHERE id = 1;
-- 等待会话 B 释放 table2 的锁
UPDATE table2 SET value = 'A' WHERE id = 1;
sql
-- 会话 B
BEGIN;
UPDATE table2 SET value = 'B' WHERE id = 1;
-- 等待会话 A 释放 table1 的锁
UPDATE table1 SET value = 'B' WHERE id = 1;

死锁检测流程:

3. 唯一约束冲突错误

在特定情况下,唯一约束冲突实际上是序列化失败的表现。

错误特征

  • SQLSTATE 代码: 23505 (unique_violation)
  • 重试策略: 需要谨慎判断是否重试
  • 发生场景: 并发插入相同的唯一值

实际示例

业务场景: 用户注册系统中的用户名唯一性检查

sql
-- 应用程序逻辑
-- 1. 检查用户名是否存在
SELECT COUNT(*) FROM users WHERE username = 'newuser';
-- 返回 0,表示用户名可用

-- 2. 插入新用户(可能与其他会话冲突)
INSERT INTO users (username, email) VALUES ('newuser', '[email protected]');
-- 可能抛出 unique_violation 错误

问题分析:

  • 两个应用程序实例可能同时检查到用户名可用
  • 两者都尝试插入相同的用户名
  • 第二个插入操作会收到唯一约束冲突错误

TIP

最佳实践对于这种情况,可以重试,但需要在应用程序逻辑中处理,而不是简单的自动重试。

4. 排除约束冲突错误

排除约束是 PostgreSQL 特有的约束类型,用于防止重叠值。

错误特征

  • SQLSTATE 代码: 23P01 (exclusion_violation)
  • 重试策略: 需要谨慎判断
  • 发生场景: 并发插入重叠的排除约束值

实际示例

业务场景: 会议室预订系统

sql
-- 创建带有排除约束的表
CREATE TABLE room_bookings (
    room_id INTEGER,
    booking_time TSRANGE,
    EXCLUDE USING GIST (room_id WITH =, booking_time WITH &&)
);

-- 并发预订可能导致冲突
INSERT INTO room_bookings (room_id, booking_time)
VALUES (1, '[2024-01-01 10:00, 2024-01-01 12:00)');

事务重试最佳实践

1. 完整事务重试原则

必须重试**完整的事务**,包括所有业务逻辑、SQL 语句和数据值的选择。

错误的重试方式

python
# ❌ 错误示例:只重试失败的 SQL 语句
def transfer_money_wrong(from_account, to_account, amount):
    try:
        conn.execute("BEGIN")
        balance = conn.execute("SELECT balance FROM accounts WHERE id = %s", [from_account])
        if balance[0] >= amount:
            conn.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                        [amount, from_account])
            conn.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                        [amount, to_account])  # 只重试这一句 - 错误!
            conn.execute("COMMIT")
    except SerializationFailure:
        # 只重试最后一条 SQL - 这是错误的!
        retry_last_statement()

正确的重试方式

python
# ✅ 正确示例:重试完整事务
def transfer_money_correct(from_account, to_account, amount, max_retries=5):
    for attempt in range(max_retries):
        try:
            with conn.begin():  # 自动处理事务
                # 重新读取当前状态
                balance = conn.execute(
                    "SELECT balance FROM accounts WHERE id = %s",
                    [from_account]
                ).fetchone()[0]

                if balance < amount:
                    raise InsufficientFunds("余额不足")

                # 执行转账操作
                conn.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    [amount, from_account]
                )
                conn.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    [amount, to_account]
                )

                # 记录交易日志
                conn.execute(
                    "INSERT INTO transactions (from_account, to_account, amount, timestamp) "
                    "VALUES (%s, %s, %s, NOW())",
                    [from_account, to_account, amount]
                )

                return True  # 成功完成

        except SerializationFailure:
            if attempt == max_retries - 1:
                raise  # 最后一次尝试失败,抛出异常
            time.sleep(0.1 * (2 ** attempt))  # 指数退避
            continue
        except DeadlockDetected:
            if attempt == max_retries - 1:
                raise
            time.sleep(0.05 * (2 ** attempt))  # 死锁重试间隔更短
            continue

    return False

2. 智能重试策略

不同类型的错误需要不同的重试策略:

python
class TransactionRetryHandler:
    def __init__(self, max_retries=5):
        self.max_retries = max_retries

    def execute_with_retry(self, transaction_func, *args, **kwargs):
        for attempt in range(self.max_retries):
            try:
                return transaction_func(*args, **kwargs)

            except SerializationFailure:
                # 序列化失败 - 无条件重试
                self._handle_serialization_failure(attempt)

            except DeadlockDetected:
                # 死锁 - 重试,但使用更短的等待时间
                self._handle_deadlock(attempt)

            except UniqueViolation as e:
                # 唯一约束冲突 - 需要判断是否应该重试
                if self._should_retry_unique_violation(e):
                    self._handle_unique_violation(attempt)
                else:
                    raise  # 不重试,直接抛出

            except ExclusionViolation as e:
                # 排除约束冲突 - 类似唯一约束处理
                if self._should_retry_exclusion_violation(e):
                    self._handle_exclusion_violation(attempt)
                else:
                    raise

        raise MaxRetriesExceeded(f"事务在 {self.max_retries} 次尝试后仍然失败")

    def _handle_serialization_failure(self, attempt):
        if attempt >= self.max_retries - 1:
            raise
        # 指数退避:0.1s, 0.2s, 0.4s, 0.8s...
        wait_time = 0.1 * (2 ** attempt)
        time.sleep(wait_time)

    def _handle_deadlock(self, attempt):
        if attempt >= self.max_retries - 1:
            raise
        # 死锁重试间隔较短
        wait_time = 0.05 * (2 ** attempt)
        time.sleep(wait_time)

    def _should_retry_unique_violation(self, error):
        # 根据业务逻辑判断是否应该重试
        # 例如:如果是自动生成的 ID 冲突,可以重试
        # 如果是用户输入的唯一字段冲突,不应该重试
        return "auto_generated" in str(error)

3. 性能优化策略

减少序列化冲突

sql
-- ❌ 容易产生冲突的做法
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM large_table WHERE category = 'electronics';
-- ... 长时间的业务逻辑处理 ...
UPDATE products SET price = price * 1.1 WHERE category = 'electronics';
COMMIT;

-- ✅ 减少冲突的做法
BEGIN ISOLATION LEVEL REPEATABLE READ;
-- 只读取必要的数据
SELECT id, price FROM products WHERE category = 'electronics' AND price > 0;
-- 尽快处理并提交
UPDATE products SET price = price * 1.1 WHERE category = 'electronics' AND price > 0;
COMMIT;

事务执行时间优化

python
def optimized_batch_update():
    # 将大批量操作分成小批次
    batch_size = 100
    products = get_products_to_update()

    for i in range(0, len(products), batch_size):
        batch = products[i:i + batch_size]

        retry_handler.execute_with_retry(
            update_product_batch,
            batch
        )

def update_product_batch(product_batch):
    with conn.begin():
        for product in product_batch:
            conn.execute(
                "UPDATE products SET price = %s WHERE id = %s",
                [product.new_price, product.id]
            )

监控和诊断

1. 错误统计查询

监控序列化失败的频率:

sql
-- 查看最近的序列化错误统计
SELECT
    datname,
    temp_files,
    temp_bytes,
    deadlocks,
    checksum_failures,
    checksum_last_failure
FROM pg_stat_database
WHERE datname = current_database();

-- 监控长时间运行的事务
SELECT
    pid,
    usename,
    application_name,
    client_addr,
    backend_start,
    xact_start,
    query_start,
    state,
    query
FROM pg_stat_activity
WHERE state IN ('idle in transaction', 'active')
    AND xact_start < NOW() - INTERVAL '1 minute'
ORDER BY xact_start;

2. 锁等待分析

sql
-- 查看当前的锁等待情况
SELECT
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS current_statement_in_blocking_process,
    blocked_activity.application_name AS blocked_application,
    blocking_activity.application_name AS blocking_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
    AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
    AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
    AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
    AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
    AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
    AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED;

错误类型总结表

错误类型SQLSTATE重试建议常见场景处理策略
序列化失败40001无条件重试可重复读/可序列化事务冲突指数退避重试
死锁检测40P01建议重试多个事务循环等待资源短间隔重试
唯一约束冲突23505谨慎重试并发插入相同唯一值业务逻辑判断
排除约束冲突23P01谨慎重试并发插入重叠排除值业务逻辑判断

关键要点

INFO

核心原则

  1. 完整事务重试: 重试整个事务,包括所有业务逻辑
  2. 无自动重试: PostgreSQL 不提供自动重试,需要应用程序实现
  3. 错误分类处理: 不同错误类型需要不同的重试策略
  4. 性能考虑: 减少事务执行时间和冲突概率

WARNING

注意事项

  • 序列化失败不保证重试后一定成功
  • 高并发环境下可能需要多次重试
  • 预备事务可能导致长时间的等待
  • 监控和优化应用程序的事务设计

通过理解和正确实现序列化失败处理机制,可以构建出既保证数据一致性又具有良好性能的高并发 PostgreSQL 应用程序。