API网关限流与熔断实战:保护后端服务的最后一道防线

深入讲解API网关层面的限流与熔断机制,包括多维度限流策略(IP、用户、接口)、分布式限流实现、熔断器状态机、服务降级方案,提供Kong、Nginx、Go的实战配置与代码示例。

引言

API网关是后端服务的入口,也是保护系统的第一道防线。在高并发场景下,合理的限流和熔断策略能够防止系统过载,保证核心服务的可用性。本文将深入讲解网关层面的限流与熔断实战方案。

限流策略设计

多维度限流

限流维度:
┌─────────────────────────────────────────┐
│ 1. IP限流:防止恶意爬虫、DDoS攻击        │
│    └─ 单IP:100次/分钟                   │
│                                         │
│ 2. 用户限流:防止单个用户滥用            │
│    └─ 普通用户:1000次/小时              │
│    └─ VIP用户:5000次/小时               │
│                                         │
│ 3. 接口限流:保护特定服务                │
│    └─ 登录接口:10次/分钟                │
│    └─ 查询接口:100次/秒                 │
│    └─ 写入接口:50次/秒                  │
│                                         │
│ 4. 全局限流:系统整体保护                │
│    └─ 总QPS:10000                       │
└─────────────────────────────────────────┘

限流算法对比

算法原理优点缺点适用场景
固定窗口固定时间窗口计数实现简单边界突发精度要求不高
滑动窗口滑动时间窗口平滑实现复杂需要精确控制
令牌桶固定速率生成令牌允许突发内存占用允许一定突发
漏桶固定速率处理严格限速无法突发严格限速场景

Redis分布式限流

令牌桶算法实现

package ratelimit

import (
    "context"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

type TokenBucketLimiter struct {
    rdb       *redis.Client
    rate      int           // 令牌生成速率(个/秒)
    capacity  int           // 桶容量
    keyPrefix string
}

func NewTokenBucketLimiter(rdb *redis.Client, rate, capacity int, keyPrefix string) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        rdb:       rdb,
        rate:      rate,
        capacity:  capacity,
        keyPrefix: keyPrefix,
    }
}

// 允许请求(消耗tokens个令牌)
func (l *TokenBucketLimiter) Allow(ctx context.Context, key string, tokens int) (bool, error) {
    bucketKey := fmt.Sprintf("%s:%s", l.keyPrefix, key)
    
    // Lua脚本保证原子性
    script := redis.NewScript(`
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local rate = tonumber(ARGV[2])
        local requested = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_time')
        local tokens = tonumber(bucket[1])
        local last_time = tonumber(bucket[2])
        
        -- 初始化
        if tokens == nil then
            tokens = capacity
            last_time = now
        end
        
        -- 计算新增令牌
        local delta_time = now - last_time
        local new_tokens = delta_time * rate
        tokens = math.min(capacity, tokens + new_tokens)
        
        -- 尝试消耗令牌
        if tokens >= requested then
            tokens = tokens - requested
            redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
            redis.call('EXPIRE', key, 60)  -- 60秒过期
            return 1  -- 允许
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
            redis.call('EXPIRE', key, 60)
            return 0  -- 拒绝
        end
    `)
    
    now := time.Now().Unix()
    result, err := script.Run(ctx, l.rdb, []string{bucketKey},
        l.capacity, l.rate, tokens, now).Int()
    if err != nil {
        return false, err
    }
    
    return result == 1, nil
}

// 使用示例:多维度限流
type MultiDimensionLimiter struct {
    ipLimiter     *TokenBucketLimiter
    userLimiter   *TokenBucketLimiter
    apiLimiter    *TokenBucketLimiter
    globalLimiter *TokenBucketLimiter
}

func NewMultiDimensionLimiter(rdb *redis.Client) *MultiDimensionLimiter {
    return &MultiDimensionLimiter{
        ipLimiter:     NewTokenBucketLimiter(rdb, 100, 200, "rl:ip"),     // 100个/秒,容量200
        userLimiter:   NewTokenBucketLimiter(rdb, 50, 100, "rl:user"),    // 50个/秒,容量100
        apiLimiter:    NewTokenBucketLimiter(rdb, 200, 500, "rl:api"),    // 200个/秒,容量500
        globalLimiter: NewTokenBucketLimiter(rdb, 10000, 20000, "rl:global"), // 10000个/秒
    }
}

func (l *MultiDimensionLimiter) Check(ctx context.Context, ip, userID, apiPath string) (bool, string) {
    // 1. IP限流
    if allowed, _ := l.ipLimiter.Allow(ctx, ip, 1); !allowed {
        return false, "IP rate limit exceeded"
    }
    
    // 2. 用户限流
    if userID != "" {
        if allowed, _ := l.userLimiter.Allow(ctx, userID, 1); !allowed {
            return false, "User rate limit exceeded"
        }
    }
    
    // 3. 接口限流
    if allowed, _ := l.apiLimiter.Allow(ctx, apiPath, 1); !allowed {
        return false, "API rate limit exceeded"
    }
    
    // 4. 全局限流
    if allowed, _ := l.globalLimiter.Allow(ctx, "global", 1); !allowed {
        return false, "Global rate limit exceeded"
    }
    
    return true, ""
}

滑动窗口实现

type SlidingWindowLimiter struct {
    rdb       *redis.Client
    window    time.Duration
    limit     int
    keyPrefix string
}

func NewSlidingWindowLimiter(rdb *redis.Client, window time.Duration, limit int, keyPrefix string) *SlidingWindowLimiter {
    return &SlidingWindowLimiter{
        rdb:       rdb,
        window:    window,
        limit:     limit,
        keyPrefix: keyPrefix,
    }
}

func (l *SlidingWindowLimiter) Allow(ctx context.Context, key string) (bool, error) {
    windowKey := fmt.Sprintf("%s:%s", l.keyPrefix, key)
    now := time.Now().UnixMilli()
    windowStart := now - l.window.Milliseconds()
    
    // Lua脚本
    script := redis.NewScript(`
        local key = KEYS[1]
        local window_start = tonumber(ARGV[1])
        local now = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])
        
        -- 移除窗口外的请求
        redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
        
        -- 统计当前窗口内的请求数
        local count = redis.call('ZCARD', key)
        
        if count < limit then
            -- 添加当前请求
            redis.call('ZADD', key, now, now .. math.random())
            redis.call('PEXPIRE', key, 60000)
            return 1
        else
            return 0
        end
    `)
    
    result, err := script.Run(ctx, l.rdb, []string{windowKey},
        windowStart, now, l.limit).Int()
    if err != nil {
        return false, err
    }
    
    return result == 1, nil
}

Kong网关限流配置

Rate Limiting插件

# kong-rate-limit.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
  name: rate-limit-plugin
config:
  minute: 100
  hour: 1000
  limit_by: ip
  policy: redis
  redis_host: redis-master
  redis_port: 6379
  fault_tolerant: true
  hide_client_headers: false
plugin: rate-limiting
---
# 应用到特定路由
apiVersion: configuration.konghq.com/v1
kind: KongIngress
metadata:
  name: api-ingress
route:
  protocols:
    - https
  methods:
    - GET
    - POST
  plugins:
    - rate-limit-plugin

Request Termination(熔断降级)

# kong-request-termination.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
  name: circuit-breaker
config:
  status_code: 503
  message: "Service temporarily unavailable, please try again later"
  content_type: application/json
plugin: request-termination
---
# 条件触发:当后端服务错误率超过阈值时启用
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
  name: global-circuit-breaker
config:
  status_code: 503
  message: "System under maintenance"
plugin: request-termination
# 通过Kong Manager或API动态启用/禁用

熔断器实现

状态机设计

熔断器状态机:
┌──────────┐    失败率>阈值    ┌──────────┐
│  CLOSED  │ ───────────────▶ │   OPEN   │
│ (正常)   │                  │ (熔断)   │
└──────────┘                  └──────────┘
     ▲                             │
     │                             │ 超时时间到
     │                             ▼
     │                        ┌──────────┐
     └──────────────────────── │HALF-OPEN │
          成功                 │ (半开)   │
                              └──────────┘
                                   │
                                   │ 失败
                                   └─────▶ OPEN

Go熔断器实现

package circuitbreaker

import (
    "context"
    "errors"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateHalfOpen
    StateOpen
)

type CircuitBreaker struct {
    name            string
    failureThreshold int           // 失败阈值
    successThreshold int           // 半开状态成功阈值
    timeout         time.Duration // 熔断持续时间
    
    mu            sync.RWMutex
    state         State
    failureCount  int
    successCount  int
    lastStateTime time.Time
}

func NewCircuitBreaker(name string, failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        name:             name,
        failureThreshold: failureThreshold,
        successThreshold: successThreshold,
        timeout:          timeout,
        state:            StateClosed,
        lastStateTime:    time.Now(),
    }
}

func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
    if !cb.allowRequest() {
        return errors.New("circuit breaker is open")
    }
    
    err := fn()
    cb.recordResult(err)
    return err
}

func (cb *CircuitBreaker) allowRequest() bool {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    
    switch cb.state {
    case StateClosed:
        return true
    case StateOpen:
        // 检查是否到达半开时间
        if time.Since(cb.lastStateTime) > cb.timeout {
            cb.mu.RUnlock()
            cb.mu.Lock()
            cb.transitionTo(StateHalfOpen)
            cb.mu.Unlock()
            cb.mu.RLock()
            return true
        }
        return false
    case StateHalfOpen:
        return true
    }
    return false
}

func (cb *CircuitBreaker) recordResult(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.onFailure()
    } else {
        cb.onSuccess()
    }
}

func (cb *CircuitBreaker) onFailure() {
    switch cb.state {
    case StateClosed:
        cb.failureCount++
        if cb.failureCount >= cb.failureThreshold {
            cb.transitionTo(StateOpen)
        }
    case StateHalfOpen:
        // 半开状态失败,重新熔断
        cb.transitionTo(StateOpen)
    }
}

func (cb *CircuitBreaker) onSuccess() {
    switch cb.state {
    case StateClosed:
        cb.failureCount = 0
    case StateHalfOpen:
        cb.successCount++
        if cb.successCount >= cb.successThreshold {
            cb.transitionTo(StateClosed)
        }
    }
}

func (cb *CircuitBreaker) transitionTo(state State) {
    cb.state = state
    cb.lastStateTime = time.Now()
    cb.failureCount = 0
    cb.successCount = 0
}

// 获取当前状态
func (cb *CircuitBreaker) GetState() State {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state
}

Nginx熔断配置

# nginx.conf
upstream backend {
    server backend1.example.com:8080 max_fails=3 fail_timeout=30s;
    server backend2.example.com:8080 max_fails=3 fail_timeout=30s;
    server backend3.example.com:8080 backup;  # 降级服务器
}

server {
    listen 80;
    
    location /api/ {
        proxy_pass http://backend;
        
        # 超时配置
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
        
        # 错误处理
        proxy_intercept_errors on;
        error_page 500 502 503 504 /50x.html;
        
        # 降级页面
        location = /50x.html {
            internal;
            return 503 '{"error": "Service temporarily unavailable", "retry_after": 30}';
            add_header Content-Type application/json;
        }
    }
    
    # 健康检查(Nginx Plus)
    location /health {
        health_check type=http;
        health_check_port 8080;
        health_check_uri=/health;
        health_check_interval 5s;
        health_check_fails 3;
        health_check_passes 2;
    }
}

服务降级策略

降级方案设计

降级策略:
1. 返回缓存数据
   └─ 场景:读接口、非实时数据
   └─ 实现:Redis缓存 + 过期时间延长

2. 返回默认值
   └─ 场景:推荐、广告等非核心功能
   └─ 实现:配置默认响应

3. 简化功能
   └─ 场景:复杂查询、聚合接口
   └─ 实现:关闭部分功能,返回基础数据

4. 异步处理
   └─ 场景:写入操作
   └─ 实现:写入MQ,异步处理

降级实现示例

package degradation

import (
    "context"
    "encoding/json"
    "time"
)

type DegradationStrategy interface {
    Execute(ctx context.Context) (interface{}, error)
}

// 缓存降级
type CacheDegradation struct {
    cache  Cache
    key    string
    maxAge time.Duration
}

func (d *CacheDegradation) Execute(ctx context.Context) (interface{}, error) {
    data, err := d.cache.Get(ctx, d.key)
    if err != nil {
        return nil, err
    }
    
    // 检查缓存是否过期(可以放宽过期时间)
    if time.Since(data.UpdatedAt) > d.maxAge {
        log.Warn("Using stale cache data")
    }
    
    return data.Value, nil
}

// 默认值降级
type DefaultValueDegradation struct {
    value interface{}
}

func (d *DefaultValueDegradation) Execute(ctx context.Context) (interface{}, error) {
    return d.value, nil
}

// 带降级的服务调用
type ServiceWithDegradation struct {
    primary    func(ctx context.Context) (interface{}, error)
    strategies []DegradationStrategy
}

func (s *ServiceWithDegradation) Call(ctx context.Context) (interface{}, error) {
    // 尝试主服务
    result, err := s.primary(ctx)
    if err == nil {
        return result, nil
    }
    
    log.Errorf("Primary service failed: %v, trying degradation", err)
    
    // 尝试降级策略
    for _, strategy := range s.strategies {
        result, err := strategy.Execute(ctx)
        if err == nil {
            return result, nil
        }
    }
    
    return nil, errors.New("all degradation strategies failed")
}

// 使用示例
func getUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
    service := &ServiceWithDegradation{
        primary: func(ctx context.Context) (interface{}, error) {
            return callUserService(ctx, userID)
        },
        strategies: []DegradationStrategy{
            // 策略1:缓存
            &CacheDegradation{
                cache:  redisCache,
                key:    "user:" + userID,
                maxAge: 24 * time.Hour, // 允许使用24小时内的缓存
            },
            // 策略2:默认值
            &DefaultValueDegradation{
                value: &UserProfile{
                    ID:       userID,
                    Name:     "Unknown",
                    Avatar:   "/default-avatar.png",
                    IsDegraded: true,
                },
            },
        },
    }
    
    result, err := service.Call(ctx)
    if err != nil {
        return nil, err
    }
    
    return result.(*UserProfile), nil
}

监控与告警

Prometheus指标

package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
)

var (
    // 限流指标
    RateLimitTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rate_limit_total",
            Help: "Total number of rate limit checks",
        },
        []string{"type", "result"}, // type: ip/user/api, result: allowed/rejected
    )
    
    // 熔断器指标
    CircuitBreakerState = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "circuit_breaker_state",
            Help: "Current state of circuit breaker (0=closed, 1=half-open, 2=open)",
        },
        []string{"name"},
    )
    
    CircuitBreakerRequests = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "circuit_breaker_requests_total",
            Help: "Total number of requests through circuit breaker",
        },
        []string{"name", "result"},
    )
    
    // 降级指标
    DegradationTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "degradation_total",
            Help: "Total number of degradation triggers",
        },
        []string{"service", "strategy"},
    )
)

func init() {
    prometheus.MustRegister(RateLimitTotal)
    prometheus.MustRegister(CircuitBreakerState)
    prometheus.MustRegister(CircuitBreakerRequests)
    prometheus.MustRegister(DegradationTotal)
}

告警规则

# prometheus-alerts.yaml
groups:
  - name: protection
    rules:
      # 限流告警
      - alert: HighRateLimitRejection
        expr: |
          sum(rate(rate_limit_total{result="rejected"}[5m])) by (type) 
          / 
          sum(rate(rate_limit_total[5m])) by (type) 
          > 0.3
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High rate limit rejection rate"
          description: "{{ $labels.type }} rejection rate is {{ $value | humanizePercentage }}"
      
      # 熔断器告警
      - alert: CircuitBreakerOpen
        expr: circuit_breaker_state == 2
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Circuit breaker is open"
          description: "Circuit breaker {{ $labels.name }} has been open for more than 1 minute"
      
      # 降级告警
      - alert: HighDegradationRate
        expr: |
          sum(rate(degradation_total[5m])) by (service) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High degradation rate"
          description: "Service {{ $labels.service }} is being degraded frequently"

总结

限流与熔断最佳实践

场景限流策略熔断策略降级策略
登录接口IP+用户双维度,严格限制快速失败返回错误提示
查询接口宽松限流,允许突发超时熔断返回缓存
写入接口严格限流熔断后异步写入MQ异步处理
核心接口多维度保护半开探测简化功能
非核心接口可关闭直接熔断返回默认值

关键原则

  1. 多层防护:网关限流 + 服务熔断 + 应用降级
  2. 精细化控制:按IP、用户、接口多维度限流
  3. 快速失败:熔断器打开后立即拒绝请求
  4. 优雅降级:保证核心功能可用
  5. 监控告警:实时监期限流、熔断、降级情况
  6. 动态调整:根据流量动态调整限流阈值

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页