限流与熔断:构建高可用分布式系统的防护机制

系统讲解限流与熔断两大核心防护机制,涵盖令牌桶、滑动窗口、熔断器状态机等算法原理,提供Redis、Go、Java等多语言实战代码与生产环境最佳实践。

引言

在高并发场景下,系统可能面临流量突增、下游服务故障等问题。限流和熔断是保护系统的两道防线:限流防止系统被流量压垮,熔断防止故障级联扩散。

本文将深入分析这两大核心机制的原理与实现,提供可直接应用于生产环境的代码示例。

限流算法详解

固定窗口计数器

最简单的限流算法,在固定时间窗口内限制请求数量。

// 固定窗口限流实现
type FixedWindowLimiter struct {
    limit     int
    window    time.Duration
    counter   int
    windowKey time.Time
    mutex     sync.Mutex
}

func (f *FixedWindowLimiter) Allow() bool {
    f.mutex.Lock()
    defer f.mutex.Unlock()
    
    now := time.Now()
    windowStart := now.Truncate(f.window)
    
    // 窗口切换,重置计数器
    if windowStart.After(f.windowKey) {
        f.counter = 0
        f.windowKey = windowStart
    }
    
    if f.counter >= f.limit {
        return false
    }
    
    f.counter++
    return true
}

问题:存在窗口边界突发流量问题。例如,限制每分钟100次请求,可能在59秒和00秒各发送100次请求,实际1秒内达到200次。

滑动窗口计数器

通过细分窗口解决边界问题,提供更平滑的限流效果。

// 滑动窗口限流实现
type SlidingWindowLimiter struct {
    limit       int
    window      time.Duration
    subWindows  int
    counters    []int
    windowStart time.Time
    mutex       sync.Mutex
}

func (s *SlidingWindowLimiter) Allow() bool {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    now := time.Now()
    s.cleanupOldWindows(now)
    
    total := 0
    for _, count := range s.counters {
        total += count
    }
    
    if total >= s.limit {
        return false
    }
    
    s.incrementCurrentWindow(now)
    return true
}

func (s *SlidingWindowLimiter) cleanupOldWindows(now time.Time) {
    subWindowDuration := s.window / time.Duration(s.subWindows)
    cutoff := now.Add(-s.window)
    
    // 移除过期窗口的计数
    for i, start := range s.windowStarts {
        if start.Before(cutoff) {
            s.counters[i] = 0
        }
    }
}

令牌桶算法

以固定速率生成令牌,请求需要获取令牌才能执行,允许一定程度的突发流量。

// 令牌桶限流实现
type TokenBucketLimiter struct {
    capacity     float64
    tokens       float64
    refillRate   float64 // 每秒生成的令牌数
    lastRefill   time.Time
    mutex        sync.Mutex
}

func (t *TokenBucketLimiter) Allow() bool {
    t.mutex.Lock()
    defer t.mutex.Unlock()
    
    t.refill()
    
    if t.tokens < 1 {
        return false
    }
    
    t.tokens--
    return true
}

func (t *TokenBucketLimiter) refill() {
    now := time.Now()
    elapsed := now.Sub(t.lastRefill).Seconds()
    t.tokens = math.Min(t.capacity, t.tokens+elapsed*t.refillRate)
    t.lastRefill = now
}

// 支持突发流量的令牌桶
func NewTokenBucketWithBurst(capacity int, refillRate float64) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        capacity:   float64(capacity),
        tokens:     float64(capacity), // 初始满桶,允许突发
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

漏桶算法

以恒定速率处理请求,平滑突发流量,适合需要稳定输出速率的场景。

// 漏桶限流实现
type LeakyBucketLimiter struct {
    capacity    int
    water       int
    leakRate    float64 // 每秒漏出的请求数
    lastLeak    time.Time
    mutex       sync.Mutex
}

func (l *LeakyBucketLimiter) Allow() bool {
    l.mutex.Lock()
    defer l.mutex.Unlock()
    
    l.leak()
    
    if l.water >= l.capacity {
        return false
    }
    
    l.water++
    return true
}

func (l *LeakyBucketLimiter) leak() {
    now := time.Now()
    elapsed := now.Sub(l.lastLeak).Seconds()
    leaked := int(elapsed * l.leakRate)
    
    l.water = max(0, l.water-leaked)
    l.lastLeak = now
}

分布式限流(Redis实现)

使用Redis实现跨服务实例的分布式限流。

-- Redis Lua脚本:令牌桶限流
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local last_time = redis.call('hget', key, 'last_time')
local tokens = redis.call('hget', key, 'tokens')

if not last_time then
    last_time = now
    tokens = capacity
else
    last_time = tonumber(last_time)
    tokens = tonumber(tokens)
    
    -- 计算新增令牌
    local delta = math.max(0, now - last_time)
    tokens = math.min(capacity, tokens + delta * rate)
end

local allowed = 0
if tokens >= requested then
    tokens = tokens - requested
    allowed = 1
end

redis.call('hset', key, 'tokens', tokens)
redis.call('hset', key, 'last_time', now)
redis.call('expire', key, 60)

return allowed
// Go调用Redis限流
func (r *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
    now := time.Now().Unix()
    rate := float64(limit) / window.Seconds()
    
    result, err := r.client.Eval(ctx, tokenBucketScript, []string{key}, 
        limit, rate, now, 1).Int()
    
    if err != nil {
        return false, err
    }
    
    return result == 1, nil
}

熔断器模式

熔断器状态机

熔断器包含三个状态:关闭(Closed)、打开(Open)、半开(Half-Open)。

// 熔断器状态机实现
type CircuitBreaker struct {
    failureThreshold int
    successThreshold int
    timeout          time.Duration
    
    state           State
    failures        int
    successes       int
    lastFailureTime time.Time
    mutex           sync.RWMutex
}

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

func (c *CircuitBreaker) Allow() bool {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    
    switch c.state {
    case StateClosed:
        return true
    case StateOpen:
        // 检查是否到达超时时间,转为半开状态
        if time.Since(c.lastFailureTime) > c.timeout {
            c.mutex.RUnlock()
            c.mutex.Lock()
            c.state = StateHalfOpen
            c.mutex.Unlock()
            c.mutex.RLock()
            return true
        }
        return false
    case StateHalfOpen:
        return true
    }
    return false
}

func (c *CircuitBreaker) RecordSuccess() {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    switch c.state {
    case StateClosed:
        c.failures = 0
    case StateHalfOpen:
        c.successes++
        if c.successes >= c.successThreshold {
            c.state = StateClosed
            c.failures = 0
            c.successes = 0
        }
    }
}

func (c *CircuitBreaker) RecordFailure() {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    c.lastFailureTime = time.Now()
    
    switch c.state {
    case StateClosed:
        c.failures++
        if c.failures >= c.failureThreshold {
            c.state = StateOpen
        }
    case StateHalfOpen:
        c.state = StateOpen
        c.successes = 0
    }
}

熔断器使用示例

// 带熔断器的HTTP调用
type ServiceClient struct {
    breaker *CircuitBreaker
    client  *http.Client
}

func (s *ServiceClient) Call(ctx context.Context, req *http.Request) (*http.Response, error) {
    if !s.breaker.Allow() {
        return nil, ErrCircuitOpen
    }
    
    resp, err := s.client.Do(req.WithContext(ctx))
    
    if err != nil {
        s.breaker.RecordFailure()
        return nil, err
    }
    
    if resp.StatusCode >= 500 {
        s.breaker.RecordFailure()
    } else {
        s.breaker.RecordSuccess()
    }
    
    return resp, nil
}

高级熔断策略

支持多维度熔断(按接口、按错误类型)。

// 多维度熔断器
type MultiDimensionBreaker struct {
    breakers map[string]*CircuitBreaker
    
    // 熔断条件配置
    failureThreshold int
    errorTypes       []error // 特定错误类型触发熔断
    statusCodes      []int   // 特定状态码触发熔断
}

func (m *MultiDimensionBreaker) RecordResult(key string, err error, statusCode int) {
    breaker := m.getOrCreateBreaker(key)
    
    // 检查是否需要熔断
    shouldFail := false
    
    if err != nil {
        for _, errType := range m.errorTypes {
            if errors.Is(err, errType) {
                shouldFail = true
                break
            }
        }
    }
    
    for _, code := range m.statusCodes {
        if statusCode == code {
            shouldFail = true
            break
        }
    }
    
    if shouldFail {
        breaker.RecordFailure()
    } else {
        breaker.RecordSuccess()
    }
}

限流与熔断的组合使用

多层防护架构

// 多层防护中间件
func ProtectionMiddleware(next http.Handler) http.Handler {
    rateLimiter := NewTokenBucketLimiter(1000, 100)
    circuitBreaker := NewCircuitBreaker(5, 3, 30*time.Second)
    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 第一层:限流
        if !rateLimiter.Allow() {
            w.WriteHeader(http.StatusTooManyRequests)
            w.Write([]byte("Rate limit exceeded"))
            return
        }
        
        // 第二层:熔断检查
        if !circuitBreaker.Allow() {
            w.WriteHeader(http.StatusServiceUnavailable)
            w.Write([]byte("Service temporarily unavailable"))
            return
        }
        
        // 执行实际请求
        recorder := &statusRecorder{ResponseWriter: w, statusCode: 200}
        next.ServeHTTP(recorder, r)
        
        // 记录结果
        circuitBreaker.RecordResult(recorder.statusCode)
    })
}

生产环境最佳实践

限流配置建议

  1. 分级限流:全局限流 + 用户级限流 + 接口级限流
  2. 动态调整:根据系统负载动态调整限流阈值
  3. 优雅降级:限流时返回友好提示,而非直接拒绝

熔断配置建议

  1. 合理设置阈值:失败率阈值建议50%-70%,避免过于敏感
  2. 超时时间:熔断超时建议10-30秒,平衡恢复速度与稳定性
  3. 监控告警:熔断触发时发送告警,及时排查问题

总结

限流和熔断是构建高可用系统的两大核心机制:

  • 限流:保护系统不被流量压垮,常用算法包括固定窗口、滑动窗口、令牌桶、漏桶
  • 熔断:防止故障级联扩散,通过状态机实现自动恢复

在生产环境中,建议组合使用限流和熔断,构建多层防护体系,同时配合监控告警,及时发现和处理问题。

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页