缓存策略与设计模式:从本地缓存到分布式缓存的完整指南

系统讲解缓存设计的核心策略与最佳实践,涵盖Cache-Aside、Write-Through、Write-Behind等模式,深入分析缓存穿透、击穿、雪崩等问题的解决方案,提供Redis、本地缓存的实战代码。

引言

缓存是提升系统性能的关键技术,但不当的缓存设计可能导致数据不一致、缓存穿透等问题。本文将系统介绍缓存的核心策略和设计模式。

缓存策略分类

Cache-Aside(旁路缓存)

最常用的缓存策略,应用代码负责缓存的读写。

type CacheAsideRepository struct {
    cache *redis.Client
    db    *sql.DB
}

// 读取:先查缓存,未命中则查数据库并回填缓存
func (r *CacheAsideRepository) GetUser(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)
    
    // 1. 查询缓存
    cachedData, err := r.cache.Get(ctx, cacheKey).Bytes()
    if err == nil {
        // 缓存命中
        var user User
        json.Unmarshal(cachedData, &user)
        return &user, nil
    }
    
    // 2. 缓存未命中,查询数据库
    user, err := r.getUserFromDB(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    // 3. 回填缓存(设置过期时间)
    data, _ := json.Marshal(user)
    r.cache.Set(ctx, cacheKey, data, 10*time.Minute)
    
    return user, nil
}

// 写入:更新数据库,删除缓存
func (r *CacheAsideRepository) UpdateUser(ctx context.Context, user *User) error {
    // 1. 更新数据库
    if err := r.updateUserInDB(ctx, user); err != nil {
        return err
    }
    
    // 2. 删除缓存(而非更新缓存)
    cacheKey := fmt.Sprintf("user:%s", user.ID)
    r.cache.Del(ctx, cacheKey)
    
    return nil
}

优点

  • 实现简单,灵活性高
  • 缓存与数据库解耦

缺点

  • 读操作可能触发缓存回填,首次读取较慢
  • 写入后删除缓存可能导致短暂不一致

Write-Through(穿透式缓存)

缓存层负责同步写入数据库。

type WriteThroughCache struct {
    cache *redis.Client
    db    *sql.DB
}

func (c *WriteThroughCache) GetUser(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)
    
    // 1. 查询缓存
    cachedData, err := c.cache.Get(ctx, cacheKey).Bytes()
    if err == nil {
        var user User
        json.Unmarshal(cachedData, &user)
        return &user, nil
    }
    
    // 2. 缓存未命中,查询数据库
    user, err := c.getUserFromDB(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    // 3. 回填缓存(由缓存层负责)
    data, _ := json.Marshal(user)
    c.cache.Set(ctx, cacheKey, data, 0)  // 不设置过期时间
    
    return user, nil
}

func (c *WriteThroughCache) UpdateUser(ctx context.Context, user *User) error {
    cacheKey := fmt.Sprintf("user:%s", user.ID)
    data, _ := json.Marshal(user)
    
    // 1. 更新缓存
    c.cache.Set(ctx, cacheKey, data, 0)
    
    // 2. 同步更新数据库(由缓存层负责)
    return c.updateUserInDB(ctx, user)
}

优点

  • 数据一致性强
  • 应用代码简化

缺点

  • 写入延迟增加(需要同步写数据库)
  • 缓存层复杂度增加

Write-Behind(异步写回)

缓存层异步批量写入数据库。

type WriteBehindCache struct {
    cache       *redis.Client
    db          *sql.DB
    writeBuffer chan *WriteOperation
}

type WriteOperation struct {
    Key   string
    Value []byte
    Op    string  // "set" or "del"
}

func NewWriteBehindCache(cache *redis.Client, db *sql.DB) *WriteBehindCache {
    c := &WriteBehindCache{
        cache:       cache,
        db:          db,
        writeBuffer: make(chan *WriteOperation, 10000),
    }
    
    // 启动后台写入线程
    go c.backgroundWriter()
    
    return c
}

func (c *WriteBehindCache) UpdateUser(ctx context.Context, user *User) error {
    cacheKey := fmt.Sprintf("user:%s", user.ID)
    data, _ := json.Marshal(user)
    
    // 1. 立即更新缓存
    c.cache.Set(ctx, cacheKey, data, 0)
    
    // 2. 异步写入数据库
    c.writeBuffer <- &WriteOperation{
        Key:   cacheKey,
        Value: data,
        Op:    "set",
    }
    
    return nil
}

func (c *WriteBehindCache) backgroundWriter() {
    ticker := time.NewTicker(1 * time.Second)
    batch := make([]*WriteOperation, 0, 100)
    
    for {
        select {
        case op := <-c.writeBuffer:
            batch = append(batch, op)
            
            // 批量写入
            if len(batch) >= 100 {
                c.flushBatch(batch)
                batch = batch[:0]
            }
            
        case <-ticker.C:
            // 定时刷新
            if len(batch) > 0 {
                c.flushBatch(batch)
                batch = batch[:0]
            }
        }
    }
}

func (c *WriteBehindCache) flushBatch(batch []*WriteOperation) {
    // 批量写入数据库
    tx, _ := c.db.Begin()
    for _, op := range batch {
        switch op.Op {
        case "set":
            var user User
            json.Unmarshal(op.Value, &user)
            c.updateUserInDB(tx, &user)
        case "del":
            c.deleteUserInDB(tx, op.Key)
        }
    }
    tx.Commit()
}

优点

  • 写入性能极高(批量异步)
  • 减少数据库压力

缺点

  • 数据可能丢失(进程崩溃时)
  • 最终一致性,不适合强一致场景

缓存问题与解决方案

缓存穿透(Cache Penetration)

查询不存在的数据,导致请求直接打到数据库。

// 解决方案1:缓存空值
func (r *Repository) GetUserWithNullCache(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)
    
    cachedData, err := r.cache.Get(ctx, cacheKey).Result()
    if err == nil {
        if cachedData == "NULL" {
            return nil, ErrUserNotFound
        }
        var user User
        json.Unmarshal([]byte(cachedData), &user)
        return &user, nil
    }
    
    user, err := r.getUserFromDB(ctx, userID)
    if err == ErrUserNotFound {
        // 缓存空值(较短过期时间)
        r.cache.Set(ctx, cacheKey, "NULL", 5*time.Minute)
        return nil, err
    }
    
    data, _ := json.Marshal(user)
    r.cache.Set(ctx, cacheKey, data, 10*time.Minute)
    return user, nil
}

// 解决方案2:布隆过滤器
type BloomFilter struct {
    bits    []bool
    hashFns []hash.Hash64
    size    uint
}

func NewBloomFilter(size uint, numHashFns int) *BloomFilter {
    bf := &BloomFilter{
        bits: make([]bool, size),
        size: size,
    }
    
    for i := 0; i < numHashFns; i++ {
        bf.hashFns = append(bf.hashFns, fnv.New64a())
    }
    
    return bf
}

func (bf *BloomFilter) Add(key string) {
    for _, h := range bf.hashFns {
        h.Reset()
        h.Write([]byte(key))
        idx := h.Sum64() % uint64(bf.size)
        bf.bits[idx] = true
    }
}

func (bf *BloomFilter) Contains(key string) bool {
    for _, h := range bf.hashFns {
        h.Reset()
        h.Write([]byte(key))
        idx := h.Sum64() % uint64(bf.size)
        if !bf.bits[idx] {
            return false
        }
    }
    return true
}

// 使用布隆过滤器
func (r *Repository) GetUserWithBloomFilter(ctx context.Context, userID string) (*User, error) {
    // 快速检查:布隆过滤器说一定不存在
    if !r.bloomFilter.Contains(userID) {
        return nil, ErrUserNotFound
    }
    
    // 可能存在,继续查询
    return r.GetUser(ctx, userID)
}

缓存击穿(Cache Breakdown)

热点key过期瞬间,大量并发请求同时查询数据库。

// 解决方案1:互斥锁(singleflight)
import "golang.org/x/sync/singleflight"

type CacheWithSingleflight struct {
    cache *redis.Client
    db    *sql.DB
    group singleflight.Group
}

func (c *CacheWithSingleflight) GetUser(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)
    
    // 查询缓存
    cachedData, err := c.cache.Get(ctx, cacheKey).Bytes()
    if err == nil {
        var user User
        json.Unmarshal(cachedData, &user)
        return &user, nil
    }
    
    // 缓存未命中,使用singleflight防止并发穿透
    result, err, _ := c.group.Do(cacheKey, func() (interface{}, error) {
        // 再次检查缓存(双重检查)
        cachedData, err := c.cache.Get(ctx, cacheKey).Bytes()
        if err == nil {
            var user User
            json.Unmarshal(cachedData, &user)
            return &user, nil
        }
        
        // 查询数据库
        user, err := c.getUserFromDB(ctx, userID)
        if err != nil {
            return nil, err
        }
        
        // 回填缓存
        data, _ := json.Marshal(user)
        c.cache.Set(ctx, cacheKey, data, 10*time.Minute)
        
        return user, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.(*User), nil
}

// 解决方案2:永不过期 + 异步更新
func (c *Cache) GetUserWithAsyncRefresh(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)
    
    cachedData, err := c.cache.Get(ctx, cacheKey).Bytes()
    if err == nil {
        var user User
        json.Unmarshal(cachedData, &user)
        
        // 检查是否需要异步刷新
        ttl, _ := c.cache.TTL(ctx, cacheKey).Result()
        if ttl < 2*time.Minute {
            // 异步刷新缓存
            go c.refreshUserCache(ctx, userID)
        }
        
        return &user, nil
    }
    
    // 缓存未命中,同步查询并填充
    user, err := c.getUserFromDB(ctx, userID)
    if err != nil {
        return nil, err
    }
    
    data, _ := json.Marshal(user)
    c.cache.Set(ctx, cacheKey, data, 0)  // 永不过期
    
    return user, nil
}

缓存雪崩(Cache Avalanche)

大量key同时过期,导致数据库压力激增。

// 解决方案:过期时间添加随机值
func (r *Repository) SetCacheWithRandomTTL(ctx context.Context, key string, value interface{}) error {
    data, _ := json.Marshal(value)
    
    // 基础过期时间 + 随机偏移
    baseTTL := 10 * time.Minute
    randomOffset := time.Duration(rand.Intn(300)) * time.Second  // 0-5分钟随机
    ttl := baseTTL + randomOffset
    
    return r.cache.Set(ctx, key, data, ttl).Err()
}

// 解决方案:多级缓存
type MultiLevelCache struct {
    l1 *sync.Map      // 本地缓存(永不过期,LRU淘汰)
    l2 *redis.Client  // Redis缓存(带过期时间)
}

func (c *MultiLevelCache) Get(ctx context.Context, key string) ([]byte, error) {
    // L1缓存
    if value, ok := c.l1.Load(key); ok {
        return value.([]byte), nil
    }
    
    // L2缓存
    value, err := c.l2.Get(ctx, key).Bytes()
    if err == nil {
        // 回填L1
        c.l1.Store(key, value)
        return value, nil
    }
    
    return nil, ErrCacheMiss
}

缓存设计模式

批量查询优化

// 批量查询,减少网络往返
func (r *Repository) GetUsers(ctx context.Context, userIDs []string) (map[string]*User, error) {
    result := make(map[string]*User)
    missedIDs := make([]string, 0)
    
    // 1. 批量查询缓存
    cacheKeys := make([]string, len(userIDs))
    for i, id := range userIDs {
        cacheKeys[i] = fmt.Sprintf("user:%s", id)
    }
    
    cachedValues, err := r.cache.MGet(ctx, cacheKeys...).Result()
    if err != nil {
        return nil, err
    }
    
    // 2. 解析缓存结果
    for i, value := range cachedValues {
        if value == nil {
            missedIDs = append(missedIDs, userIDs[i])
            continue
        }
        
        var user User
        json.Unmarshal([]byte(value.(string)), &user)
        result[userIDs[i]] = &user
    }
    
    // 3. 查询未命中的数据
    if len(missedIDs) > 0 {
        missedUsers, err := r.getUsersFromDB(ctx, missedIDs)
        if err != nil {
            return nil, err
        }
        
        // 4. 批量回填缓存
        pipe := r.cache.Pipeline()
        for _, user := range missedUsers {
            cacheKey := fmt.Sprintf("user:%s", user.ID)
            data, _ := json.Marshal(user)
            pipe.Set(ctx, cacheKey, data, 10*time.Minute)
            result[user.ID] = user
        }
        pipe.Exec(ctx)
    }
    
    return result, nil
}

热点数据预加载

type HotDataCache struct {
    cache *redis.Client
    db    *sql.DB
}

func (c *HotDataCache) StartPreload(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Minute)
    
    go func() {
        for range ticker.C {
            c.preloadHotData(ctx)
        }
    }()
}

func (c *HotDataCache) preloadHotData(ctx context.Context) {
    // 查询热门商品
    hotProducts, _ := c.db.QueryContext(ctx, `
        SELECT id, name, price, stock 
        FROM products 
        WHERE is_hot = true 
        ORDER BY sales DESC 
        LIMIT 100
    `)
    
    // 批量写入缓存
    pipe := c.cache.Pipeline()
    for hotProducts.Next() {
        var product Product
        hotProducts.Scan(&product.ID, &product.Name, &product.Price, &product.Stock)
        
        cacheKey := fmt.Sprintf("product:%s", product.ID)
        data, _ := json.Marshal(product)
        pipe.Set(ctx, cacheKey, data, 30*time.Minute)
    }
    pipe.Exec(ctx)
}

缓存监控

type CacheMetrics struct {
    hitCount  int64
    missCount int64
}

func (m *CacheMetrics) RecordHit() {
    atomic.AddInt64(&m.hitCount, 1)
}

func (m *CacheMetrics) RecordMiss() {
    atomic.AddInt64(&m.missCount, 1)
}

func (m *CacheMetrics) HitRate() float64 {
    hits := atomic.LoadInt64(&m.hitCount)
    misses := atomic.LoadInt64(&m.missCount)
    total := hits + misses
    
    if total == 0 {
        return 0
    }
    
    return float64(hits) / float64(total)
}

// Prometheus指标
var (
    cacheHits = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "cache_hits_total",
            Help: "Total number of cache hits",
        },
        []string{"cache_name"},
    )
    
    cacheMisses = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "cache_misses_total",
            Help: "Total number of cache misses",
        },
        []string{"cache_name"},
    )
)

总结

缓存策略选择指南:

策略一致性性能复杂度适用场景
Cache-Aside最终一致大多数场景
Write-Through强一致对一致性要求高
Write-Behind最终一致极高写密集、允许短暂不一致

关键原则:

  • 防止缓存穿透(空值缓存、布隆过滤器)
  • 防止缓存击穿(互斥锁、永不过期)
  • 防止缓存雪崩(随机过期时间、多级缓存)
  • 监控缓存命中率,持续优化

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页