数据库事务:保证数据一致性的关键

深入理解数据库事务的 ACID 特性,学习在 Go 中实现事务、并发控制和错误处理

数据库事务:保证数据一致性的关键

在数据库操作中,我们经常需要执行多个相关的操作,这些操作要么全部成功,要么全部失败。事务(Transaction)就是用来保证这种原子性的机制。

本文将深入探讨数据库事务的原理和在 Go 中的实现。

什么是事务?

事务是一组数据库操作的逻辑单元,具有以下 ACID 特性:

  • 原子性(Atomicity):事务中的操作要么全部成功,要么全部失败
  • 一致性(Consistency):事务执行前后,数据库保持一致状态
  • 隔离性(Isolation):并发事务之间互不干扰
  • 持久性(Durability):事务提交后,数据永久保存

基础事务操作

package main

import (
    "context"
    "database/sql"
    "log"
    
    _ "github.com/go-sql-driver/mysql"
)

func main() {
    db, err := sql.Open("mysql", "user:password@/dbname")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()
    
    // 开始事务
    tx, err := db.Begin()
    if err != nil {
        log.Fatal(err)
    }
    
    // 执行操作
    _, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    if err != nil {
        tx.Rollback()
        log.Fatal(err)
    }
    
    _, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    if err != nil {
        tx.Rollback()
        log.Fatal(err)
    }
    
    // 提交事务
    err = tx.Commit()
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("Transaction committed")
}

使用 defer 确保回滚

func transfer(db *sql.DB, fromID, toID int, amount float64) (err error) {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    
    // 确保在函数退出时回滚(如果未提交)
    defer func() {
        if err != nil {
            tx.Rollback()
            return
        }
        err = tx.Commit()
    }()
    
    // 扣款
    _, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
    if err != nil {
        return err
    }
    
    // 检查余额
    var balance float64
    err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromID).Scan(&balance)
    if err != nil {
        return err
    }
    if balance < 0 {
        return fmt.Errorf("insufficient balance")
    }
    
    // 入账
    _, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
    if err != nil {
        return err
    }
    
    return nil
}

事务与 Context

func transferWithContext(ctx context.Context, db *sql.DB, fromID, toID int, amount float64) error {
    // 使用 context 开始事务
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 执行操作(带 context)
    _, err = tx.ExecContext(ctx, 
        "UPDATE accounts SET balance = balance - ? WHERE id = ?", 
        amount, fromID)
    if err != nil {
        return err
    }
    
    _, err = tx.ExecContext(ctx, 
        "UPDATE accounts SET balance = balance + ? WHERE id = ?", 
        amount, toID)
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

事务隔离级别

// 设置隔离级别
tx, err := db.BeginTx(ctx, &sql.TxOptions{
    Isolation: sql.LevelSerializable,
})

// 常用隔离级别
// LevelReadUncommitted - 读未提交
// LevelReadCommitted - 读已提交
// LevelRepeatableRead - 可重复读
// LevelSerializable - 串行化

隔离级别与并发问题

// 脏读(Dirty Read)
// 事务 A 读取了事务 B 未提交的数据

// 不可重复读(Non-repeatable Read)
// 事务 A 两次读取同一数据,结果不同(事务 B 修改了数据)

// 幻读(Phantom Read)
// 事务 A 两次查询,结果集不同(事务 B 插入了新数据)

乐观锁与悲观锁

悲观锁

// 使用 SELECT ... FOR UPDATE
func updateWithPessimisticLock(ctx context.Context, db *sql.DB, id int, amount float64) error {
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 锁定行
    var balance float64
    err = tx.QueryRowContext(ctx, 
        "SELECT balance FROM accounts WHERE id = ? FOR UPDATE", id).
        Scan(&balance)
    if err != nil {
        return err
    }
    
    // 更新
    newBalance := balance + amount
    _, err = tx.ExecContext(ctx, 
        "UPDATE accounts SET balance = ? WHERE id = ?", 
        newBalance, id)
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

乐观锁

// 使用版本号
func updateWithOptimisticLock(ctx context.Context, db *sql.DB, id int, amount float64) error {
    for i := 0; i < 3; i++ { // 最多重试 3 次
        // 读取当前值和版本号
        var balance float64
        var version int
        err := db.QueryRowContext(ctx, 
            "SELECT balance, version FROM accounts WHERE id = ?", id).
            Scan(&balance, &version)
        if err != nil {
            return err
        }
        
        // 更新(带版本号检查)
        newBalance := balance + amount
        result, err := db.ExecContext(ctx, 
            "UPDATE accounts SET balance = ?, version = version + 1 WHERE id = ? AND version = ?", 
            newBalance, id, version)
        if err != nil {
            return err
        }
        
        rows, err := result.RowsAffected()
        if err != nil {
            return err
        }
        
        if rows > 0 {
            return nil // 更新成功
        }
        
        // 版本号不匹配,重试
        time.Sleep(100 * time.Millisecond)
    }
    
    return fmt.Errorf("failed to update after retries")
}

嵌套事务(Savepoint)

func nestedTransaction(ctx context.Context, db *sql.DB) error {
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 外层事务操作
    _, err = tx.ExecContext(ctx, "INSERT INTO orders (user_id) VALUES (1)")
    if err != nil {
        return err
    }
    
    // 创建保存点
    _, err = tx.ExecContext(ctx, "SAVEPOINT sp1")
    if err != nil {
        return err
    }
    
    // 内层事务操作
    _, err = tx.ExecContext(ctx, "INSERT INTO order_items (order_id, product_id) VALUES (1, 1)")
    if err != nil {
        // 回滚到保存点
        tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT sp1")
        log.Println("Rolled back to savepoint")
    }
    
    // 继续外层事务
    _, err = tx.ExecContext(ctx, "UPDATE users SET order_count = order_count + 1 WHERE id = 1")
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

实战:订单系统

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"
)

type OrderService struct {
    db *sql.DB
}

func NewOrderService(db *sql.DB) *OrderService {
    return &OrderService{db: db}
}

// CreateOrder 创建订单(事务)
func (s *OrderService) CreateOrder(ctx context.Context, userID int, items []OrderItem) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 1. 创建订单
    var orderID int
    err = tx.QueryRowContext(ctx, 
        "INSERT INTO orders (user_id, created_at) VALUES (?, ?) RETURNING id",
        userID, time.Now()).
        Scan(&orderID)
    if err != nil {
        return fmt.Errorf("create order: %w", err)
    }
    
    // 2. 创建订单项并扣减库存
    totalAmount := 0.0
    for _, item := range items {
        // 检查库存(悲观锁)
        var stock int
        var price float64
        err = tx.QueryRowContext(ctx, 
            "SELECT stock, price FROM products WHERE id = ? FOR UPDATE",
            item.ProductID).
            Scan(&stock, &price)
        if err != nil {
            return fmt.Errorf("check stock: %w", err)
        }
        
        if stock < item.Quantity {
            return fmt.Errorf("insufficient stock for product %d", item.ProductID)
        }
        
        // 扣减库存
        _, err = tx.ExecContext(ctx, 
            "UPDATE products SET stock = stock - ? WHERE id = ?",
            item.Quantity, item.ProductID)
        if err != nil {
            return fmt.Errorf("update stock: %w", err)
        }
        
        // 创建订单项
        amount := price * float64(item.Quantity)
        _, err = tx.ExecContext(ctx, 
            "INSERT INTO order_items (order_id, product_id, quantity, price, amount) VALUES (?, ?, ?, ?, ?)",
            orderID, item.ProductID, item.Quantity, price, amount)
        if err != nil {
            return fmt.Errorf("create order item: %w", err)
        }
        
        totalAmount += amount
    }
    
    // 3. 更新订单总金额
    _, err = tx.ExecContext(ctx, 
        "UPDATE orders SET total_amount = ? WHERE id = ?",
        totalAmount, orderID)
    if err != nil {
        return fmt.Errorf("update order amount: %w", err)
    }
    
    // 4. 记录用户购买历史
    _, err = tx.ExecContext(ctx, 
        "INSERT INTO purchase_history (user_id, order_id, amount) VALUES (?, ?, ?)",
        userID, orderID, totalAmount)
    if err != nil {
        return fmt.Errorf("record purchase: %w", err)
    }
    
    return tx.Commit()
}

type OrderItem struct {
    ProductID int
    Quantity  int
}

func main() {
    db, err := sql.Open("mysql", "user:password@/shop")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()
    
    service := NewOrderService(db)
    ctx := context.Background()
    
    items := []OrderItem{
        {ProductID: 1, Quantity: 2},
        {ProductID: 2, Quantity: 1},
    }
    
    err = service.CreateOrder(ctx, 123, items)
    if err != nil {
        log.Printf("Create order failed: %v", err)
        return
    }
    
    log.Println("Order created successfully")
}

事务最佳实践

1. 保持事务简短

// ❌ 不好:事务中包含耗时操作
func badTransaction(db *sql.DB) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    _, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    if err != nil {
        return err
    }
    
    // 耗时的外部调用
    err = callExternalService() // 可能很慢
    if err != nil {
        return err
    }
    
    _, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

// ✅ 好:事务只包含数据库操作
func goodTransaction(db *sql.DB) error {
    // 先执行外部调用
    err := callExternalService()
    if err != nil {
        return err
    }
    
    // 再执行事务
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    _, err = tx.Exec("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    if err != nil {
        return err
    }
    
    _, err = tx.Exec("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    if err != nil {
        return err
    }
    
    return tx.Commit()
}

2. 处理死锁

func handleDeadlock(db *sql.DB, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        err := doTransaction(db)
        if err == nil {
            return nil
        }
        
        // 检查是否是死锁错误
        if isDeadlockError(err) {
            log.Printf("Deadlock detected, retry %d/%d", i+1, maxRetries)
            time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
            continue
        }
        
        return err
    }
    
    return fmt.Errorf("max retries exceeded")
}

func isDeadlockError(err error) bool {
    // MySQL: Error 1213
    // PostgreSQL: Error 40P01
    return strings.Contains(err.Error(), "deadlock")
}

3. 使用连接池

db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)

总结

事务是保证数据一致性的关键机制:

  1. ACID 特性:原子性、一致性、隔离性、持久性
  2. 隔离级别:从低到高依次为读未提交、读已提交、可重复读、串行化
  3. 并发控制:悲观锁(SELECT FOR UPDATE)和乐观锁(版本号)
  4. 最佳实践:保持事务简短、处理死锁、使用连接池

记住:事务虽然强大,但也会带来性能开销。要在一致性和性能之间找到平衡。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页