数据库事务隔离级别与并发控制:从理论到实践

深入讲解数据库事务ACID特性与四种隔离级别,详解脏读、不可重复读、幻读问题,对比乐观锁与悲观锁的实现策略,提供PostgreSQL/MySQL实战案例与死锁排查方法。

引言

事务是数据库保证数据一致性的核心机制。在高并发场景下,如何平衡数据一致性与系统性能是每个后端工程师必须面对的挑战。本文将深入讲解事务隔离级别、并发控制策略及生产环境最佳实践。

事务ACID特性

Atomicity(原子性): 事务要么全部成功,要么全部失败
    └─ 实现:Undo Log(MySQL)、WAL(PostgreSQL)

Consistency(一致性): 事务执行前后数据保持一致
    └─ 实现:约束、触发器、应用逻辑

Isolation(隔离性): 并发事务互不干扰
    └─ 实现:锁机制、MVCC

Durability(持久性): 事务提交后数据永久保存
    └─ 实现:Redo Log(MySQL)、WAL(PostgreSQL)

四种隔离级别

隔离级别脏读不可重复读幻读性能数据库默认
Read Uncommitted最高-
Read CommittedPostgreSQL/Oracle
Repeatable ReadMySQL
Serializable最低-

三种并发问题

脏读(Dirty Read):
事务A读取到事务B未提交的数据
┌─────────┐                ┌─────────┐
│ 事务A   │                │ 事务B   │
├─────────┤                ├─────────┤
│         │  UPDATE=100    │         │
│         │ ◀───────────── │ BEGIN   │
│ READ=100│                │         │
│         │  ROLLBACK      │         │
│         │ ◀───────────── │         │
│ 读到已回滚的数据 ✗       │         │
└─────────┘                └─────────┘

不可重复读(Non-Repeatable Read):
事务A两次读取同一行数据不一致
┌─────────┐                ┌─────────┐
│ 事务A   │                │ 事务B   │
├─────────┤                ├─────────┤
│ READ=100│                │         │
│         │  UPDATE=200    │         │
│         │ ◀───────────── │ COMMIT  │
│ READ=200│                │         │
│ 两次读取不一致 ✗         │         │
└─────────┘                └─────────┘

幻读(Phantom Read):
事务A两次查询发现多出或少了记录
┌─────────┐                ┌─────────┐
│ 事务A   │                │ 事务B   │
├─────────┤                ├─────────┤
│SELECT * │                │         │
│ 得到5行 │                │         │
│         │  INSERT新行    │         │
│         │ ◀───────────── │ COMMIT  │
│SELECT * │                │         │
│ 得到6行 │                │         │
│ 出现幻读 ✗               │         │
└─────────┘                └─────────┘

MVCC(多版本并发控制)

实现原理

MVCC核心思想:
每行数据保存多个版本,通过版本链实现非阻塞读

行记录结构:
┌──────────────────────────────────────────┐
│ DB_TRX_ID (事务ID) | DB_ROLL_PTR (回滚指针) │
├──────────────────────────────────────────┤
│              当前版本数据                  │
└──────────────────────────────────────────┘
              ↓ (回滚指针)
┌──────────────────────────────────────────┐
│         上一个版本(Undo Log)             │
└──────────────────────────────────────────┘

Read View(读视图):
- m_ids: 生成Read View时活跃的事务ID列表
- min_trx_id: m_ids中最小的
- max_trx_id: 下一个将分配的事务ID
- creator_trx_id: 创建该Read View的事务ID

可见性判断:
if (version.trx_id == creator_trx_id) 可见
else if (version.trx_id < min_trx_id) 可见(已提交)
else if (version.trx_id >= max_trx_id) 不可见(未开始)
else if (version.trx_id in m_ids) 不可见(进行中)
else 可见(已提交)

Read Committed vs Repeatable Read

Read Committed(Oracle/PostgreSQL默认):
- 每次SELECT生成新的Read View
- 能看到其他事务已提交的变更
- 存在不可重复读问题

Repeatable Read(MySQL默认):
- 事务第一次SELECT生成Read View
- 整个事务期间使用同一个Read View
- 避免不可重复读
- MySQL通过Next-Key Lock避免幻读

锁机制详解

MySQL锁类型

-- 共享锁(S锁):读锁,允许其他事务读取
SELECT * FROM users WHERE id = 1 LOCK IN SHARE MODE;

-- 排他锁(X锁):写锁,阻止其他事务读写
SELECT * FROM users WHERE id = 1 FOR UPDATE;

-- 表级锁
LOCK TABLE users WRITE;  -- 排他表锁
LOCK TABLE users READ;   -- 共享表锁
UNLOCK TABLES;

-- 意向锁(InnoDB自动管理)
-- IS锁:意向共享锁(准备给行加S锁)
-- IX锁:意向排他锁(准备给行加X锁)

锁的粒度

表级锁:
┌─────────────────────────┐
│         users表          │
│  ┌─────┐ ┌─────┐       │
│  │ Row1│ │ Row2│ ...   │  整个表被锁定
│  └─────┘ └─────┘       │
└─────────────────────────┘

行级锁:
┌─────────────────────────┐
│         users表          │
│  ┌─────┐ ┌─────┐       │
│  │ Row1│ │ Row2│ ...   │  只锁定特定行
│  │ 🔒  │ │     │       │
│  └─────┘ └─────┘       │
└─────────────────────────┘

间隙锁(Gap Lock):
┌─────────────────────────┐
│  id=1  │  gap  │  id=5  │
│        │  🔒   │        │  锁定范围,防止幻读
└─────────────────────────┘

Next-Key Lock(行锁+间隙锁):
锁定记录本身及其前面的间隙

并发控制策略

悲观锁实现

// 悲观锁:假设冲突会发生,提前加锁
type PessimisticLockRepository struct {
    db *sql.DB
}

func (r *PessimisticLockRepository) UpdateUserBalance(
    ctx context.Context, 
    userID int64, 
    amount float64,
) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 使用FOR UPDATE加排他锁
    var balance float64
    err = tx.QueryRowContext(ctx,
        "SELECT balance FROM users WHERE id = ? FOR UPDATE",
        userID,
    ).Scan(&balance)
    if err != nil {
        return err
    }

    // 更新余额
    newBalance := balance + amount
    _, err = tx.ExecContext(ctx,
        "UPDATE users SET balance = ? WHERE id = ?",
        newBalance, userID,
    )
    if err != nil {
        return err
    }

    return tx.Commit()
}

乐观锁实现

// 乐观锁:假设冲突不会发生,提交时检查
type OptimisticLockRepository struct {
    db *sql.DB
}

func (r *OptimisticLockRepository) UpdateUserWithVersion(
    ctx context.Context,
    user *User,
) error {
    const maxRetries = 3
    
    for attempt := 0; attempt < maxRetries; attempt++ {
        result, err := r.db.ExecContext(ctx,
            `UPDATE users 
             SET name = ?, email = ?, version = version + 1 
             WHERE id = ? AND version = ?`,
            user.Name, user.Email, user.ID, user.Version,
        )
        if err != nil {
            return err
        }

        rowsAffected, err := result.RowsAffected()
        if err != nil {
            return err
        }

        if rowsAffected > 0 {
            user.Version++
            return nil  // 更新成功
        }

        // 版本冲突,重新读取最新数据
        err = r.db.QueryRowContext(ctx,
            "SELECT name, email, version FROM users WHERE id = ?",
            user.ID,
        ).Scan(&user.Name, &user.Email, &user.Version)
        if err != nil {
            return err
        }

        // 重试
        time.Sleep(time.Duration(attempt*100) * time.Millisecond)
    }

    return errors.New("max retries exceeded")
}

版本号 vs CAS

-- 方式1:版本号(推荐)
ALTER TABLE users ADD COLUMN version INT DEFAULT 0;

UPDATE users 
SET balance = 100, version = version + 1 
WHERE id = 1 AND version = 5;

-- 方式2:CAS(Compare-And-Swap)
UPDATE users 
SET balance = 100 
WHERE id = 1 AND balance = 90;  -- 只有余额是90时才更新

-- 方式3:更新时间戳
UPDATE users 
SET balance = 100, updated_at = NOW() 
WHERE id = 1 AND updated_at = '2026-01-15 10:00:00';

死锁预防与排查

死锁示例

-- 死锁场景
-- 事务A
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 锁住id=1
-- 等待...
UPDATE accounts SET balance = balance + 100 WHERE id = 2;  -- 需要id=2的锁

-- 事务B
BEGIN;
UPDATE accounts SET balance = balance - 50 WHERE id = 2;   -- 锁住id=2
-- 等待...
UPDATE accounts SET balance = balance + 50 WHERE id = 1;   -- 需要id=1的锁

-- 结果:互相等待,死锁!

死锁预防策略

// 策略1:固定顺序访问资源
func Transfer(ctx context.Context, fromID, toID int64, amount float64) error {
    // 始终按ID大小顺序加锁
    if fromID > toID {
        fromID, toID = toID, fromID
        amount = -amount
    }
    
    tx, _ := db.BeginTx(ctx, nil)
    defer tx.Rollback()
    
    // 先锁小ID,再锁大ID
    tx.ExecContext(ctx, "SELECT * FROM accounts WHERE id = ? FOR UPDATE", fromID)
    tx.ExecContext(ctx, "SELECT * FROM accounts WHERE id = ? FOR UPDATE", toID)
    
    // 执行转账
    tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
    tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
    
    return tx.Commit()
}

// 策略2:设置锁超时
func UpdateWithTimeout(ctx context.Context) error {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    
    tx, _ := db.BeginTx(ctx, nil)
    defer tx.Rollback()
    
    // 超时会自动释放锁
    _, err := tx.ExecContext(ctx, "SELECT * FROM users WHERE id = 1 FOR UPDATE")
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            return errors.New("lock timeout")
        }
        return err
    }
    
    return tx.Commit()
}

死锁排查(MySQL)

-- 查看最近的死锁信息
SHOW ENGINE INNODB STATUS;

-- 查看当前锁等待
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

-- 杀死阻塞的事务
KILL <blocking_thread>;

-- 设置死锁检测超时(默认50秒)
SET innodb_lock_wait_timeout = 10;

死锁排查(PostgreSQL)

-- 查看锁等待
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

-- 设置语句超时
SET statement_timeout = '10s';

-- 设置锁超时
SET lock_timeout = '5s';

实战案例:库存扣减

问题分析

电商库存扣减的并发问题:

库存剩余:10件

并发请求:
┌─────────┐  ┌─────────┐  ┌─────────┐
│ 用户A   │  │ 用户B   │  │ 用户C   │
│ 购买5件 │  │ 购买3件 │  │ 购买4件 │
└─────────┘  └─────────┘  └─────────┘

问题:超卖(5+3+4=12 > 10)

解决方案对比

-- 方案1:悲观锁(简单但性能差)
BEGIN;
SELECT stock FROM products WHERE id = 1 FOR UPDATE;
-- 应用层检查stock >= quantity
UPDATE products SET stock = stock - 5 WHERE id = 1;
COMMIT;

-- 方案2:原子更新(推荐)
UPDATE products 
SET stock = stock - 5 
WHERE id = 1 AND stock >= 5;
-- 通过affected rows判断是否成功

-- 方案3:乐观锁(高并发场景)
UPDATE products 
SET stock = stock - 5, version = version + 1 
WHERE id = 1 AND stock >= 5 AND version = 100;
-- 失败则重试

-- 方案4:预扣减+Redis(超高并发)
-- 先在Redis中扣减,异步同步到数据库

Redis预扣减实现

func DeductStock(ctx context.Context, productID int64, quantity int) error {
    redisKey := fmt.Sprintf("stock:%d", productID)
    
    // Lua脚本保证原子性
    script := redis.NewScript(`
        local stock = tonumber(redis.call('GET', KEYS[1]))
        if stock == nil then
            return -1  -- 商品不存在
        end
        if stock < tonumber(ARGV[1]) then
            return 0   -- 库存不足
        end
        redis.call('DECRBY', KEYS[1], ARGV[1])
        return 1       -- 扣减成功
    `)
    
    result, err := script.Run(ctx, rdb, []string{redisKey}, quantity).Int()
    if err != nil {
        return err
    }
    
    switch result {
    case -1:
        return errors.New("product not found")
    case 0:
        return errors.New("insufficient stock")
    case 1:
        // Redis扣减成功,发送MQ消息异步更新数据库
        mq.Publish("stock.deduct", map[string]interface{}{
            "product_id": productID,
            "quantity":   quantity,
        })
        return nil
    }
    
    return errors.New("unknown error")
}

总结

事务隔离级别选择指南:

场景推荐隔离级别并发策略
金融交易Repeatable Read悲观锁
电商库存Read Committed乐观锁+原子更新
社交FeedRead Committed乐观锁
日志记录Read Uncommitted无锁
报表统计Read CommittedMVCC快照读

关键原则:

  1. 选择合适的隔离级别:不要盲目使用最高级别
  2. 优先使用乐观锁:减少锁竞争,提升并发性能
  3. 控制事务范围:事务越小,锁持有时间越短
  4. 预防死锁:固定资源访问顺序,设置超时
  5. 监控锁等待:及时发现和解决锁竞争问题

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页