分布式锁实现方案:Redis、ZooKeeper与etcd的实战对比

深入讲解分布式锁的核心概念与实现方案,涵盖Redis Redlock算法、ZooKeeper临时节点锁、etcd租约锁等主流实现,提供多语言代码示例、死锁防护与性能优化策略。

引言

在分布式系统中,多个节点可能需要互斥访问共享资源。分布式锁是实现这种互斥访问的关键机制,但其实现比单机锁复杂得多,需要考虑网络分区、节点故障等问题。

本文将系统介绍分布式锁的设计原理和主流实现方案。

分布式锁的核心要求

理想的分布式锁应满足:

1. 互斥性(Mutual Exclusion)
   - 同一时刻只有一个客户端持有锁

2. 不会死锁(Deadlock Free)
   - 客户端崩溃或网络异常时,锁能自动释放

3. 容错性(Fault Tolerance)
   - 部分节点故障时,锁服务仍可用

4. 性能(Performance)
   - 加锁/解锁操作延迟低
   - 支持高并发竞争

Redis分布式锁

基础实现(单节点)

type RedisLock struct {
    client    *redis.Client
    key       string
    value     string
    expire    time.Duration
}

func NewRedisLock(client *redis.Client, key string, expire time.Duration) *RedisLock {
    return &RedisLock{
        client: client,
        key:    key,
        value:  uuid.New().String(),  // 唯一标识,防止误删
        expire: expire,
    }
}

func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
    // SET key value NX PX expire
    // NX: 仅当key不存在时设置
    // PX: 毫秒级过期时间
    ok, err := l.client.SetNX(ctx, l.key, l.value, l.expire).Result()
    if err != nil {
        return false, err
    }
    return ok, nil
}

func (l *RedisLock) Unlock(ctx context.Context) error {
    // 使用Lua脚本确保原子性:只删除自己持有的锁
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    
    result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Int()
    if err != nil {
        return err
    }
    
    if result == 0 {
        return fmt.Errorf("lock not held or already expired")
    }
    
    return nil
}

// 使用示例
func processOrder(ctx context.Context, orderID string) error {
    lock := NewRedisLock(redisClient, fmt.Sprintf("lock:order:%s", orderID), 30*time.Second)
    
    locked, err := lock.Lock(ctx)
    if err != nil {
        return err
    }
    if !locked {
        return fmt.Errorf("failed to acquire lock")
    }
    
    defer lock.Unlock(ctx)
    
    // 处理订单逻辑
    return doProcessOrder(ctx, orderID)
}

锁续期(Watchdog机制)

type RedisLockWithWatchdog struct {
    *RedisLock
    stopWatchdog chan struct{}
}

func (l *RedisLockWithWatchdog) Lock(ctx context.Context) (bool, error) {
    locked, err := l.RedisLock.Lock(ctx)
    if err != nil || !locked {
        return locked, err
    }
    
    // 启动看门狗,定期续期
    l.stopWatchdog = make(chan struct{})
    go l.watchdog(ctx)
    
    return true, nil
}

func (l *RedisLockWithWatchdog) watchdog(ctx context.Context) {
    // 每过期时间的1/3续期一次
    ticker := time.NewTicker(l.expire / 3)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            // 续期
            script := `
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("pexpire", KEYS[1], ARGV[2])
                else
                    return 0
                end
            `
            l.client.Eval(ctx, script, []string{l.key}, l.value, l.expire.Milliseconds())
            
        case <-l.stopWatchdog:
            return
        case <-ctx.Done():
            return
        }
    }
}

func (l *RedisLockWithWatchdog) Unlock(ctx context.Context) error {
    // 停止看门狗
    if l.stopWatchdog != nil {
        close(l.stopWatchdog)
    }
    
    return l.RedisLock.Unlock(ctx)
}

Redlock算法(多节点)

// Redlock实现:多个独立Redis节点
type Redlock struct {
    nodes    []*redis.Client
    key      string
    value    string
    expire   time.Duration
    quorum   int  // 需要获得锁的最小节点数
}

func NewRedlock(nodes []*redis.Client, key string, expire time.Duration) *Redlock {
    return &Redlock{
        nodes:  nodes,
        key:    key,
        value:  uuid.New().String(),
        expire: expire,
        quorum: len(nodes)/2 + 1,  // 多数派
    }
}

func (r *Redlock) Lock(ctx context.Context) (bool, error) {
    startTime := time.Now()
    
    // 尝试在所有节点上加锁
    successCount := 0
    for _, node := range r.nodes {
        locked, err := node.SetNX(ctx, r.key, r.value, r.expire).Result()
        if err == nil && locked {
            successCount++
        }
    }
    
    // 计算实际有效时间
    elapsedTime := time.Since(startTime)
    validTime := r.expire - elapsedTime
    
    // 检查是否达到quorum且有效时间足够
    if successCount >= r.quorum && validTime > 0 {
        return true, nil
    }
    
    // 未达到quorum,释放所有已获得的锁
    r.Unlock(ctx)
    return false, nil
}

func (r *Redlock) Unlock(ctx context.Context) {
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    
    // 尝试在所有节点上释放锁
    for _, node := range r.nodes {
        node.Eval(ctx, script, []string{r.key}, r.value)
    }
}

Redlock争议:Martin Kleppmann指出Redlock在时钟跳跃、GC暂停等场景下可能失效。Antirez(Redis作者)进行了反驳。实际使用中需要权衡。

ZooKeeper分布式锁

临时顺序节点实现

type ZKLock struct {
    conn      *zk.Conn
    lockPath  string
    lockNode  string
}

func NewZKLock(conn *zk.Conn, lockPath string) *ZKLock {
    return &ZKLock{
        conn:     conn,
        lockPath: lockPath,
    }
}

func (l *ZKLock) Lock(ctx context.Context) error {
    // 确保锁目录存在
    exists, _, err := l.conn.Exists(l.lockPath)
    if err != nil {
        return err
    }
    if !exists {
        l.conn.Create(l.lockPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
    }
    
    // 创建临时顺序节点
    node, err := l.conn.CreateProtectedEphemeralSequential(
        l.lockPath+"/lock-",
        []byte{},
        zk.WorldACL(zk.PermAll),
    )
    if err != nil {
        return err
    }
    l.lockNode = node
    
    // 检查是否获得锁
    return l.tryAcquireLock(ctx)
}

func (l *ZKLock) tryAcquireLock(ctx context.Context) error {
    for {
        // 获取所有子节点
        children, _, err := l.conn.Children(l.lockPath)
        if err != nil {
            return err
        }
        
        // 排序
        sort.Strings(children)
        
        // 检查自己是否是最小节点
        myNodeName := path.Base(l.lockNode)
        if len(children) > 0 && children[0] == myNodeName {
            // 获得锁
            return nil
        }
        
        // 找到前一个节点
        var prevNode string
        for i, child := range children {
            if child == myNodeName && i > 0 {
                prevNode = children[i-1]
                break
            }
        }
        
        if prevNode == "" {
            return fmt.Errorf("previous node not found")
        }
        
        // 监听前一个节点的删除事件
        prevPath := l.lockPath + "/" + prevNode
        watch, _, err := l.conn.ExistsW(prevPath)
        if err != nil {
            return err
        }
        
        select {
        case <-watch:
            // 前一个节点被删除,重新检查
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (l *ZKLock) Unlock() error {
    if l.lockNode != "" {
        return l.conn.Delete(l.lockNode, -1)
    }
    return nil
}

优点

  • 强一致性(ZAB协议)
  • 自动释放(会话超时)
  • 避免羊群效应(只监听前一个节点)

缺点

  • 性能低于Redis
  • 需要维护ZooKeeper集群

etcd分布式锁

基于Lease的实现

type EtcdLock struct {
    client   *clientv3.Client
    key      string
    lease    clientv3.Lease
    leaseID  clientv3.LeaseID
    kv       clientv3.KV
}

func NewEtcdLock(client *clientv3.Client, key string, ttl int64) (*EtcdLock, error) {
    lease := clientv3.NewLease(client)
    
    // 创建租约
    leaseResp, err := lease.Grant(context.Background(), ttl)
    if err != nil {
        return nil, err
    }
    
    return &EtcdLock{
        client:  client,
        key:     key,
        lease:   lease,
        leaseID: leaseResp.ID,
        kv:      clientv3.NewKV(client),
    }, nil
}

func (l *EtcdLock) Lock(ctx context.Context) error {
    // 使用事务确保原子性
    cmp := clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)
    
    // 如果key不存在,创建它
    put := clientv3.OpPut(l.key, "locked", clientv3.WithLease(l.leaseID))
    
    // 如果key已存在,获取当前值
    get := clientv3.OpGet(l.key)
    
    resp, err := l.kv.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
    if err != nil {
        return err
    }
    
    if !resp.Succeeded {
        // 锁已被占用,等待释放
        return l.waitForLock(ctx)
    }
    
    // 启动keepalive
    keepAliveCh, err := l.lease.KeepAlive(ctx, l.leaseID)
    if err != nil {
        return err
    }
    
    go func() {
        for range keepAliveCh {
            // 消费keepalive响应
        }
    }()
    
    return nil
}

func (l *EtcdLock) waitForLock(ctx context.Context) error {
    // 监听key的删除事件
    watchCh := l.client.Watch(ctx, l.key)
    
    for watchResp := range watchCh {
        for _, event := range watchResp.Events {
            if event.Type == mvccpb.DELETE {
                // 锁被释放,尝试获取
                return l.Lock(ctx)
            }
        }
    }
    
    return ctx.Err()
}

func (l *EtcdLock) Unlock(ctx context.Context) error {
    // 删除key
    _, err := l.kv.Delete(ctx, l.key)
    if err != nil {
        return err
    }
    
    // 释放租约
    _, err = l.lease.Revoke(ctx, l.leaseID)
    return err
}

使用etcd官方lock库

import "go.etcd.io/etcd/client/v3/concurrency"

type EtcdLockV2 struct {
    client   *clientv3.Client
    session  *concurrency.Session
    mutex    *concurrency.Mutex
}

func NewEtcdLockV2(client *clientv3.Client, key string) (*EtcdLockV2, error) {
    // 创建会话(带TTL)
    session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
    if err != nil {
        return nil, err
    }
    
    return &EtcdLockV2{
        client:  client,
        session: session,
        mutex:   concurrency.NewMutex(session, key),
    }, nil
}

func (l *EtcdLockV2) Lock(ctx context.Context) error {
    return l.mutex.Lock(ctx)
}

func (l *EtcdLockV2) Unlock(ctx context.Context) error {
    err := l.mutex.Unlock(ctx)
    l.session.Close()
    return err
}

性能对比

func BenchmarkDistributedLocks(b *testing.B) {
    // Redis锁
    redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    b.Run("Redis", func(b *testing.B) {
        for i := 0; i < b.N; i++ {
            lock := NewRedisLock(redisClient, "bench:lock", 10*time.Second)
            lock.Lock(context.Background())
            lock.Unlock(context.Background())
        }
    })
    
    // ZooKeeper锁
    zkConn, _, _ := zk.Connect([]string{"localhost:2181"}, time.Second)
    b.Run("ZooKeeper", func(b *testing.B) {
        for i := 0; i < b.N; i++ {
            lock := NewZKLock(zkConn, "/bench/lock")
            lock.Lock(context.Background())
            lock.Unlock()
        }
    })
    
    // etcd锁
    etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}})
    b.Run("etcd", func(b *testing.B) {
        for i := 0; i < b.N; i++ {
            lock, _ := NewEtcdLockV2(etcdClient, "/bench/lock")
            lock.Lock(context.Background())
            lock.Unlock(context.Background())
        }
    })
}

// 结果(单次加锁+解锁):
// Redis:     ~2-5 ms
// ZooKeeper: ~10-20 ms
// etcd:      ~5-10 ms

最佳实践

锁粒度设计

// 粗粒度锁(性能差,但简单)
func processAllOrders() {
    lock := NewRedisLock(client, "lock:orders", 30*time.Second)
    lock.Lock(ctx)
    defer lock.Unlock(ctx)
    
    // 处理所有订单
}

// 细粒度锁(性能好,但复杂)
func processOrder(orderID string) {
    lock := NewRedisLock(client, fmt.Sprintf("lock:order:%s", orderID), 30*time.Second)
    lock.Lock(ctx)
    defer lock.Unlock(ctx)
    
    // 只处理特定订单
}

超时与重试

func acquireLockWithRetry(ctx context.Context, lock *RedisLock, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        locked, err := lock.Lock(ctx)
        if err != nil {
            return err
        }
        if locked {
            return nil
        }
        
        // 指数退避重试
        backoff := time.Duration(math.Pow(2, float64(i))) * 100 * time.Millisecond
        select {
        case <-time.After(backoff):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    
    return fmt.Errorf("failed to acquire lock after %d retries", maxRetries)
}

总结

分布式锁方案选择:

方案一致性性能复杂度适用场景
Redis最终一致高并发、允许短暂不一致
ZooKeeper强一致对一致性要求高
etcd强一致中高Kubernetes生态、云原生

关键原则:

  • 锁必须设置超时,防止死锁
  • 使用唯一标识,防止误删
  • 考虑网络分区和节点故障
  • 根据业务场景选择合适的方案

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页