引言
事务是数据库保证数据一致性的核心机制。在高并发场景下,如何平衡数据一致性与系统性能是每个后端工程师必须面对的挑战。本文将深入讲解事务隔离级别、并发控制策略及生产环境最佳实践。
事务ACID特性
Atomicity(原子性): 事务要么全部成功,要么全部失败
└─ 实现:Undo Log(MySQL)、WAL(PostgreSQL)
Consistency(一致性): 事务执行前后数据保持一致
└─ 实现:约束、触发器、应用逻辑
Isolation(隔离性): 并发事务互不干扰
└─ 实现:锁机制、MVCC
Durability(持久性): 事务提交后数据永久保存
└─ 实现:Redo Log(MySQL)、WAL(PostgreSQL)
四种隔离级别
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 | 数据库默认 |
|---|---|---|---|---|---|
| Read Uncommitted | ✗ | ✗ | ✗ | 最高 | - |
| Read Committed | ✓ | ✗ | ✗ | 高 | PostgreSQL/Oracle |
| Repeatable Read | ✓ | ✓ | ✗ | 中 | MySQL |
| Serializable | ✓ | ✓ | ✓ | 最低 | - |
三种并发问题
脏读(Dirty Read):
事务A读取到事务B未提交的数据
┌─────────┐ ┌─────────┐
│ 事务A │ │ 事务B │
├─────────┤ ├─────────┤
│ │ UPDATE=100 │ │
│ │ ◀───────────── │ BEGIN │
│ READ=100│ │ │
│ │ ROLLBACK │ │
│ │ ◀───────────── │ │
│ 读到已回滚的数据 ✗ │ │
└─────────┘ └─────────┘
不可重复读(Non-Repeatable Read):
事务A两次读取同一行数据不一致
┌─────────┐ ┌─────────┐
│ 事务A │ │ 事务B │
├─────────┤ ├─────────┤
│ READ=100│ │ │
│ │ UPDATE=200 │ │
│ │ ◀───────────── │ COMMIT │
│ READ=200│ │ │
│ 两次读取不一致 ✗ │ │
└─────────┘ └─────────┘
幻读(Phantom Read):
事务A两次查询发现多出或少了记录
┌─────────┐ ┌─────────┐
│ 事务A │ │ 事务B │
├─────────┤ ├─────────┤
│SELECT * │ │ │
│ 得到5行 │ │ │
│ │ INSERT新行 │ │
│ │ ◀───────────── │ COMMIT │
│SELECT * │ │ │
│ 得到6行 │ │ │
│ 出现幻读 ✗ │ │
└─────────┘ └─────────┘
MVCC(多版本并发控制)
实现原理
MVCC核心思想:
每行数据保存多个版本,通过版本链实现非阻塞读
行记录结构:
┌──────────────────────────────────────────┐
│ DB_TRX_ID (事务ID) | DB_ROLL_PTR (回滚指针) │
├──────────────────────────────────────────┤
│ 当前版本数据 │
└──────────────────────────────────────────┘
↓ (回滚指针)
┌──────────────────────────────────────────┐
│ 上一个版本(Undo Log) │
└──────────────────────────────────────────┘
Read View(读视图):
- m_ids: 生成Read View时活跃的事务ID列表
- min_trx_id: m_ids中最小的
- max_trx_id: 下一个将分配的事务ID
- creator_trx_id: 创建该Read View的事务ID
可见性判断:
if (version.trx_id == creator_trx_id) 可见
else if (version.trx_id < min_trx_id) 可见(已提交)
else if (version.trx_id >= max_trx_id) 不可见(未开始)
else if (version.trx_id in m_ids) 不可见(进行中)
else 可见(已提交)
Read Committed vs Repeatable Read
Read Committed(Oracle/PostgreSQL默认):
- 每次SELECT生成新的Read View
- 能看到其他事务已提交的变更
- 存在不可重复读问题
Repeatable Read(MySQL默认):
- 事务第一次SELECT生成Read View
- 整个事务期间使用同一个Read View
- 避免不可重复读
- MySQL通过Next-Key Lock避免幻读
锁机制详解
MySQL锁类型
-- 共享锁(S锁):读锁,允许其他事务读取
SELECT * FROM users WHERE id = 1 LOCK IN SHARE MODE;
-- 排他锁(X锁):写锁,阻止其他事务读写
SELECT * FROM users WHERE id = 1 FOR UPDATE;
-- 表级锁
LOCK TABLE users WRITE; -- 排他表锁
LOCK TABLE users READ; -- 共享表锁
UNLOCK TABLES;
-- 意向锁(InnoDB自动管理)
-- IS锁:意向共享锁(准备给行加S锁)
-- IX锁:意向排他锁(准备给行加X锁)
锁的粒度
表级锁:
┌─────────────────────────┐
│ users表 │
│ ┌─────┐ ┌─────┐ │
│ │ Row1│ │ Row2│ ... │ 整个表被锁定
│ └─────┘ └─────┘ │
└─────────────────────────┘
行级锁:
┌─────────────────────────┐
│ users表 │
│ ┌─────┐ ┌─────┐ │
│ │ Row1│ │ Row2│ ... │ 只锁定特定行
│ │ 🔒 │ │ │ │
│ └─────┘ └─────┘ │
└─────────────────────────┘
间隙锁(Gap Lock):
┌─────────────────────────┐
│ id=1 │ gap │ id=5 │
│ │ 🔒 │ │ 锁定范围,防止幻读
└─────────────────────────┘
Next-Key Lock(行锁+间隙锁):
锁定记录本身及其前面的间隙
并发控制策略
悲观锁实现
// 悲观锁:假设冲突会发生,提前加锁
type PessimisticLockRepository struct {
db *sql.DB
}
func (r *PessimisticLockRepository) UpdateUserBalance(
ctx context.Context,
userID int64,
amount float64,
) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 使用FOR UPDATE加排他锁
var balance float64
err = tx.QueryRowContext(ctx,
"SELECT balance FROM users WHERE id = ? FOR UPDATE",
userID,
).Scan(&balance)
if err != nil {
return err
}
// 更新余额
newBalance := balance + amount
_, err = tx.ExecContext(ctx,
"UPDATE users SET balance = ? WHERE id = ?",
newBalance, userID,
)
if err != nil {
return err
}
return tx.Commit()
}
乐观锁实现
// 乐观锁:假设冲突不会发生,提交时检查
type OptimisticLockRepository struct {
db *sql.DB
}
func (r *OptimisticLockRepository) UpdateUserWithVersion(
ctx context.Context,
user *User,
) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
result, err := r.db.ExecContext(ctx,
`UPDATE users
SET name = ?, email = ?, version = version + 1
WHERE id = ? AND version = ?`,
user.Name, user.Email, user.ID, user.Version,
)
if err != nil {
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected > 0 {
user.Version++
return nil // 更新成功
}
// 版本冲突,重新读取最新数据
err = r.db.QueryRowContext(ctx,
"SELECT name, email, version FROM users WHERE id = ?",
user.ID,
).Scan(&user.Name, &user.Email, &user.Version)
if err != nil {
return err
}
// 重试
time.Sleep(time.Duration(attempt*100) * time.Millisecond)
}
return errors.New("max retries exceeded")
}
版本号 vs CAS
-- 方式1:版本号(推荐)
ALTER TABLE users ADD COLUMN version INT DEFAULT 0;
UPDATE users
SET balance = 100, version = version + 1
WHERE id = 1 AND version = 5;
-- 方式2:CAS(Compare-And-Swap)
UPDATE users
SET balance = 100
WHERE id = 1 AND balance = 90; -- 只有余额是90时才更新
-- 方式3:更新时间戳
UPDATE users
SET balance = 100, updated_at = NOW()
WHERE id = 1 AND updated_at = '2026-01-15 10:00:00';
死锁预防与排查
死锁示例
-- 死锁场景
-- 事务A
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 锁住id=1
-- 等待...
UPDATE accounts SET balance = balance + 100 WHERE id = 2; -- 需要id=2的锁
-- 事务B
BEGIN;
UPDATE accounts SET balance = balance - 50 WHERE id = 2; -- 锁住id=2
-- 等待...
UPDATE accounts SET balance = balance + 50 WHERE id = 1; -- 需要id=1的锁
-- 结果:互相等待,死锁!
死锁预防策略
// 策略1:固定顺序访问资源
func Transfer(ctx context.Context, fromID, toID int64, amount float64) error {
// 始终按ID大小顺序加锁
if fromID > toID {
fromID, toID = toID, fromID
amount = -amount
}
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
// 先锁小ID,再锁大ID
tx.ExecContext(ctx, "SELECT * FROM accounts WHERE id = ? FOR UPDATE", fromID)
tx.ExecContext(ctx, "SELECT * FROM accounts WHERE id = ? FOR UPDATE", toID)
// 执行转账
tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
return tx.Commit()
}
// 策略2:设置锁超时
func UpdateWithTimeout(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
// 超时会自动释放锁
_, err := tx.ExecContext(ctx, "SELECT * FROM users WHERE id = 1 FOR UPDATE")
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return errors.New("lock timeout")
}
return err
}
return tx.Commit()
}
死锁排查(MySQL)
-- 查看最近的死锁信息
SHOW ENGINE INNODB STATUS;
-- 查看当前锁等待
SELECT
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
-- 杀死阻塞的事务
KILL <blocking_thread>;
-- 设置死锁检测超时(默认50秒)
SET innodb_lock_wait_timeout = 10;
死锁排查(PostgreSQL)
-- 查看锁等待
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
-- 设置语句超时
SET statement_timeout = '10s';
-- 设置锁超时
SET lock_timeout = '5s';
实战案例:库存扣减
问题分析
电商库存扣减的并发问题:
库存剩余:10件
并发请求:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 用户A │ │ 用户B │ │ 用户C │
│ 购买5件 │ │ 购买3件 │ │ 购买4件 │
└─────────┘ └─────────┘ └─────────┘
问题:超卖(5+3+4=12 > 10)
解决方案对比
-- 方案1:悲观锁(简单但性能差)
BEGIN;
SELECT stock FROM products WHERE id = 1 FOR UPDATE;
-- 应用层检查stock >= quantity
UPDATE products SET stock = stock - 5 WHERE id = 1;
COMMIT;
-- 方案2:原子更新(推荐)
UPDATE products
SET stock = stock - 5
WHERE id = 1 AND stock >= 5;
-- 通过affected rows判断是否成功
-- 方案3:乐观锁(高并发场景)
UPDATE products
SET stock = stock - 5, version = version + 1
WHERE id = 1 AND stock >= 5 AND version = 100;
-- 失败则重试
-- 方案4:预扣减+Redis(超高并发)
-- 先在Redis中扣减,异步同步到数据库
Redis预扣减实现
func DeductStock(ctx context.Context, productID int64, quantity int) error {
redisKey := fmt.Sprintf("stock:%d", productID)
// Lua脚本保证原子性
script := redis.NewScript(`
local stock = tonumber(redis.call('GET', KEYS[1]))
if stock == nil then
return -1 -- 商品不存在
end
if stock < tonumber(ARGV[1]) then
return 0 -- 库存不足
end
redis.call('DECRBY', KEYS[1], ARGV[1])
return 1 -- 扣减成功
`)
result, err := script.Run(ctx, rdb, []string{redisKey}, quantity).Int()
if err != nil {
return err
}
switch result {
case -1:
return errors.New("product not found")
case 0:
return errors.New("insufficient stock")
case 1:
// Redis扣减成功,发送MQ消息异步更新数据库
mq.Publish("stock.deduct", map[string]interface{}{
"product_id": productID,
"quantity": quantity,
})
return nil
}
return errors.New("unknown error")
}
总结
事务隔离级别选择指南:
| 场景 | 推荐隔离级别 | 并发策略 |
|---|---|---|
| 金融交易 | Repeatable Read | 悲观锁 |
| 电商库存 | Read Committed | 乐观锁+原子更新 |
| 社交Feed | Read Committed | 乐观锁 |
| 日志记录 | Read Uncommitted | 无锁 |
| 报表统计 | Read Committed | MVCC快照读 |
关键原则:
- 选择合适的隔离级别:不要盲目使用最高级别
- 优先使用乐观锁:减少锁竞争,提升并发性能
- 控制事务范围:事务越小,锁持有时间越短
- 预防死锁:固定资源访问顺序,设置超时
- 监控锁等待:及时发现和解决锁竞争问题
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。