引言
在高并发场景下,系统可能面临流量突增、下游服务故障等问题。限流和熔断是保护系统的两道防线:限流防止系统被流量压垮,熔断防止故障级联扩散。
本文将深入分析这两大核心机制的原理与实现,提供可直接应用于生产环境的代码示例。
限流算法详解
固定窗口计数器
最简单的限流算法,在固定时间窗口内限制请求数量。
// 固定窗口限流实现
type FixedWindowLimiter struct {
limit int
window time.Duration
counter int
windowKey time.Time
mutex sync.Mutex
}
func (f *FixedWindowLimiter) Allow() bool {
f.mutex.Lock()
defer f.mutex.Unlock()
now := time.Now()
windowStart := now.Truncate(f.window)
// 窗口切换,重置计数器
if windowStart.After(f.windowKey) {
f.counter = 0
f.windowKey = windowStart
}
if f.counter >= f.limit {
return false
}
f.counter++
return true
}
问题:存在窗口边界突发流量问题。例如,限制每分钟100次请求,可能在59秒和00秒各发送100次请求,实际1秒内达到200次。
滑动窗口计数器
通过细分窗口解决边界问题,提供更平滑的限流效果。
// 滑动窗口限流实现
type SlidingWindowLimiter struct {
limit int
window time.Duration
subWindows int
counters []int
windowStart time.Time
mutex sync.Mutex
}
func (s *SlidingWindowLimiter) Allow() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
now := time.Now()
s.cleanupOldWindows(now)
total := 0
for _, count := range s.counters {
total += count
}
if total >= s.limit {
return false
}
s.incrementCurrentWindow(now)
return true
}
func (s *SlidingWindowLimiter) cleanupOldWindows(now time.Time) {
subWindowDuration := s.window / time.Duration(s.subWindows)
cutoff := now.Add(-s.window)
// 移除过期窗口的计数
for i, start := range s.windowStarts {
if start.Before(cutoff) {
s.counters[i] = 0
}
}
}
令牌桶算法
以固定速率生成令牌,请求需要获取令牌才能执行,允许一定程度的突发流量。
// 令牌桶限流实现
type TokenBucketLimiter struct {
capacity float64
tokens float64
refillRate float64 // 每秒生成的令牌数
lastRefill time.Time
mutex sync.Mutex
}
func (t *TokenBucketLimiter) Allow() bool {
t.mutex.Lock()
defer t.mutex.Unlock()
t.refill()
if t.tokens < 1 {
return false
}
t.tokens--
return true
}
func (t *TokenBucketLimiter) refill() {
now := time.Now()
elapsed := now.Sub(t.lastRefill).Seconds()
t.tokens = math.Min(t.capacity, t.tokens+elapsed*t.refillRate)
t.lastRefill = now
}
// 支持突发流量的令牌桶
func NewTokenBucketWithBurst(capacity int, refillRate float64) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: float64(capacity),
tokens: float64(capacity), // 初始满桶,允许突发
refillRate: refillRate,
lastRefill: time.Now(),
}
}
漏桶算法
以恒定速率处理请求,平滑突发流量,适合需要稳定输出速率的场景。
// 漏桶限流实现
type LeakyBucketLimiter struct {
capacity int
water int
leakRate float64 // 每秒漏出的请求数
lastLeak time.Time
mutex sync.Mutex
}
func (l *LeakyBucketLimiter) Allow() bool {
l.mutex.Lock()
defer l.mutex.Unlock()
l.leak()
if l.water >= l.capacity {
return false
}
l.water++
return true
}
func (l *LeakyBucketLimiter) leak() {
now := time.Now()
elapsed := now.Sub(l.lastLeak).Seconds()
leaked := int(elapsed * l.leakRate)
l.water = max(0, l.water-leaked)
l.lastLeak = now
}
分布式限流(Redis实现)
使用Redis实现跨服务实例的分布式限流。
-- Redis Lua脚本:令牌桶限流
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local last_time = redis.call('hget', key, 'last_time')
local tokens = redis.call('hget', key, 'tokens')
if not last_time then
last_time = now
tokens = capacity
else
last_time = tonumber(last_time)
tokens = tonumber(tokens)
-- 计算新增令牌
local delta = math.max(0, now - last_time)
tokens = math.min(capacity, tokens + delta * rate)
end
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
redis.call('hset', key, 'tokens', tokens)
redis.call('hset', key, 'last_time', now)
redis.call('expire', key, 60)
return allowed
// Go调用Redis限流
func (r *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
now := time.Now().Unix()
rate := float64(limit) / window.Seconds()
result, err := r.client.Eval(ctx, tokenBucketScript, []string{key},
limit, rate, now, 1).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
熔断器模式
熔断器状态机
熔断器包含三个状态:关闭(Closed)、打开(Open)、半开(Half-Open)。
// 熔断器状态机实现
type CircuitBreaker struct {
failureThreshold int
successThreshold int
timeout time.Duration
state State
failures int
successes int
lastFailureTime time.Time
mutex sync.RWMutex
}
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
func (c *CircuitBreaker) Allow() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
switch c.state {
case StateClosed:
return true
case StateOpen:
// 检查是否到达超时时间,转为半开状态
if time.Since(c.lastFailureTime) > c.timeout {
c.mutex.RUnlock()
c.mutex.Lock()
c.state = StateHalfOpen
c.mutex.Unlock()
c.mutex.RLock()
return true
}
return false
case StateHalfOpen:
return true
}
return false
}
func (c *CircuitBreaker) RecordSuccess() {
c.mutex.Lock()
defer c.mutex.Unlock()
switch c.state {
case StateClosed:
c.failures = 0
case StateHalfOpen:
c.successes++
if c.successes >= c.successThreshold {
c.state = StateClosed
c.failures = 0
c.successes = 0
}
}
}
func (c *CircuitBreaker) RecordFailure() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.lastFailureTime = time.Now()
switch c.state {
case StateClosed:
c.failures++
if c.failures >= c.failureThreshold {
c.state = StateOpen
}
case StateHalfOpen:
c.state = StateOpen
c.successes = 0
}
}
熔断器使用示例
// 带熔断器的HTTP调用
type ServiceClient struct {
breaker *CircuitBreaker
client *http.Client
}
func (s *ServiceClient) Call(ctx context.Context, req *http.Request) (*http.Response, error) {
if !s.breaker.Allow() {
return nil, ErrCircuitOpen
}
resp, err := s.client.Do(req.WithContext(ctx))
if err != nil {
s.breaker.RecordFailure()
return nil, err
}
if resp.StatusCode >= 500 {
s.breaker.RecordFailure()
} else {
s.breaker.RecordSuccess()
}
return resp, nil
}
高级熔断策略
支持多维度熔断(按接口、按错误类型)。
// 多维度熔断器
type MultiDimensionBreaker struct {
breakers map[string]*CircuitBreaker
// 熔断条件配置
failureThreshold int
errorTypes []error // 特定错误类型触发熔断
statusCodes []int // 特定状态码触发熔断
}
func (m *MultiDimensionBreaker) RecordResult(key string, err error, statusCode int) {
breaker := m.getOrCreateBreaker(key)
// 检查是否需要熔断
shouldFail := false
if err != nil {
for _, errType := range m.errorTypes {
if errors.Is(err, errType) {
shouldFail = true
break
}
}
}
for _, code := range m.statusCodes {
if statusCode == code {
shouldFail = true
break
}
}
if shouldFail {
breaker.RecordFailure()
} else {
breaker.RecordSuccess()
}
}
限流与熔断的组合使用
多层防护架构
// 多层防护中间件
func ProtectionMiddleware(next http.Handler) http.Handler {
rateLimiter := NewTokenBucketLimiter(1000, 100)
circuitBreaker := NewCircuitBreaker(5, 3, 30*time.Second)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 第一层:限流
if !rateLimiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("Rate limit exceeded"))
return
}
// 第二层:熔断检查
if !circuitBreaker.Allow() {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Service temporarily unavailable"))
return
}
// 执行实际请求
recorder := &statusRecorder{ResponseWriter: w, statusCode: 200}
next.ServeHTTP(recorder, r)
// 记录结果
circuitBreaker.RecordResult(recorder.statusCode)
})
}
生产环境最佳实践
限流配置建议
- 分级限流:全局限流 + 用户级限流 + 接口级限流
- 动态调整:根据系统负载动态调整限流阈值
- 优雅降级:限流时返回友好提示,而非直接拒绝
熔断配置建议
- 合理设置阈值:失败率阈值建议50%-70%,避免过于敏感
- 超时时间:熔断超时建议10-30秒,平衡恢复速度与稳定性
- 监控告警:熔断触发时发送告警,及时排查问题
总结
限流和熔断是构建高可用系统的两大核心机制:
- 限流:保护系统不被流量压垮,常用算法包括固定窗口、滑动窗口、令牌桶、漏桶
- 熔断:防止故障级联扩散,通过状态机实现自动恢复
在生产环境中,建议组合使用限流和熔断,构建多层防护体系,同时配合监控告警,及时发现和处理问题。
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。