Go 与消息队列:Kafka 和 RabbitMQ 实战

全面介绍 Go 语言中的消息队列实战,包括 Kafka 生产者与消费者、Kafka Streams、RabbitMQ 集成、AMQP 协议、消息模式、错误处理和死信队列

Go 与消息队列:Kafka 和 RabbitMQ 实战

凌晨三点,你的电商系统正在做促销活动。突然,订单量暴增 10 倍。数据库扛不住了,CPU 飙到 100%,订单服务开始超时,支付回调丢失,库存扣减不一致……整个系统像多米诺骨牌一样倒下了。

问题出在哪里?答案是:你的服务之间耦合太紧了。订单服务同步调用支付、库存、通知,任何一个环节变慢都会拖垮整条链路。

这就是消息队列大显身手的地方。它就像邮局——你不需要亲自跑到朋友家门口送信,只要把信投进邮筒,邮局会负责送达。发送方和接收方完全解耦,即使接收方暂时不在,消息也不会丢失。

今天,我们就来深入学习 Go 语言中两个最流行的消息队列:KafkaRabbitMQ,从基础概念到生产级实战。

消息队列的核心概念

在写代码之前,我们先建立对消息队列的整体认知。

为什么需要消息队列?

消息队列解决三大核心问题:

1. 异步处理

没有消息队列时,用户下单的流程是这样的:

用户请求 → 创建订单(50ms) → 扣减库存(80ms) → 发起支付(120ms) → 发送通知(60ms) → 返回响应
总耗时:310ms

引入消息队列后:

用户请求 → 创建订单(50ms) → 发送消息到队列(5ms) → 返回响应
总耗时:55ms

后台异步消费:
  Worker A: 扣减库存
  Worker B: 发起支付
  Worker C: 发送通知

响应时间从 310ms 降到 55ms,用户体验直接起飞。

2. 流量削峰

促销活动时每秒涌入 10000 个订单,但数据库每秒只能处理 2000 次写入。没有消息队列,系统直接崩溃。有了消息队列,请求先进队列,后台按数据库能承受的速率慢慢消费。

3. 系统解耦

订单服务只管往队列里扔消息,不关心谁在消费、消费多快、是否在线。库存服务、支付服务、通知服务各自独立运行,独立扩缩容。

消息模型

消息队列有两种基本模型:

点对点模型(Queue):一条消息只被一个消费者消费。

Producer ──> [Message Queue] ──> Consumer A
                                Consumer B (竞争消费,只有一个能拿到)

发布/订阅模型(Pub/Sub):一条消息被多个订阅者消费。

Publisher ──> [Topic] ──> Subscriber A (收到完整副本)
                         Subscriber B (收到完整副本)
                         Subscriber C (收到完整副本)

Kafka 和 RabbitMQ 都支持这两种模型,但设计哲学不同:

特性KafkaRabbitMQ
消息存储持久化日志,可重放消费后删除
消费模型拉(Pull)推(Push)
吞吐量极高(百万/秒)高(万级/秒)
延迟毫秒级微秒级
消息顺序分区内有序单队列有序
典型场景日志、事件流、大数据任务队列、工作流

Kafka:分布式事件流平台

Kafka 最初由 LinkedIn 开发,如今已成为大数据和事件驱动架构的基石。它不是简单的消息队列,而是一个分布式事件流平台

Kafka 核心概念

                    ┌─────────────────────────────┐
                    │       Kafka Cluster          │
                    │                              │
Producer ──写入──>  │  Topic: orders               │
                    │  ┌──────┐ ┌──────┐ ┌──────┐ │
                    │  │Partition│ │Partition│ │Partition│ │
                    │  │  0     │ │  1     │ │  2     │ │
                    │  └──────┘ └──────┘ └──────┘ │
                    │                              │
Consumer Group ──读取──>                           │
  ┌─────────┐        │                              │
  │Consumer A│──读──> │  Partition 0                 │
  │Consumer B│──读──> │  Partition 1                 │
  │Consumer C│──读──> │  Partition 2                 │
  └─────────┘        │                              │
                    └─────────────────────────────┘
  • Topic:消息的逻辑分类(类似数据库表)
  • Partition:Topic 的物理分片,是并行度的基本单位
  • Offset:每条消息在 Partition 中的唯一序号
  • Consumer Group:一组消费者共同消费一个 Topic

安装和启动 Kafka

# 使用 Docker 快速启动(推荐)
docker-compose up -d

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
    depends_on:
      - zookeeper

Kafka 生产者:发送消息

Go 生态中最优秀的 Kafka 客户端是 segmentio/kafka-go,它的设计简洁、API 友好。

go get github.com/segmentio/kafka-go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// OrderEvent 订单事件
type OrderEvent struct {
    OrderID    string    `json:"order_id"`
    UserID     string    `json:"user_id"`
    ProductID  string    `json:"product_id"`
    Amount     float64   `json:"amount"`
    Status     string    `json:"status"`
    CreatedAt  time.Time `json:"created_at"`
}

// KafkaProducer 封装 Kafka 生产者
type KafkaProducer struct {
    writer *kafka.Writer
}

func NewKafkaProducer(brokers []string, topic string) *KafkaProducer {
    return &KafkaProducer{
        writer: &kafka.Writer{
            Addr:     kafka.TCP(brokers...),
            Topic:    topic,
            Balancer: &kafka.LeastBytes{}, // 按字节数负载均衡到分区
            // 生产环境推荐配置
            BatchSize:    100,              // 批量发送
            BatchTimeout: 50 * time.Millisecond,
            MaxAttempts:  3,                // 最大重试次数
            RequiredAcks: kafka.RequireAll, // 所有副本确认
            Compression:  kafka.Snappy,     // 消息压缩
        },
    }
}

func (p *KafkaProducer) Send(ctx context.Context, key string, event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("failed to marshal event: %w", err)
    }

    msg := kafka.Message{
        Key:   []byte(key),  // Key 相同的消息会进入同一个分区,保证顺序
        Value: data,
        Time:  time.Now(),
        Headers: []kafka.Header{
            {Key: "event-type", Value: []byte("order.created")},
            {Key: "version", Value: []byte("1.0")},
        },
    }

    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        return fmt.Errorf("failed to write message: %w", err)
    }

    log.Printf("📨 Sent order event: %s (key: %s)", event.OrderID, key)
    return nil
}

func (p *KafkaProducer) Close() error {
    return p.writer.Close()
}

func main() {
    producer := NewKafkaProducer(
        []string{"localhost:9092"},
        "orders",
    )
    defer producer.Close()

    ctx := context.Background()

    // 模拟发送订单事件
    orders := []OrderEvent{
        {OrderID: "ORD-001", UserID: "user-1", ProductID: "prod-A", Amount: 99.9, Status: "created", CreatedAt: time.Now()},
        {OrderID: "ORD-002", UserID: "user-2", ProductID: "prod-B", Amount: 199.0, Status: "created", CreatedAt: time.Now()},
        {OrderID: "ORD-003", UserID: "user-1", ProductID: "prod-C", Amount: 59.9, Status: "created", CreatedAt: time.Now()},
    }

    for _, order := range orders {
        if err := producer.Send(ctx, order.UserID, order); err != nil {
            log.Printf("Failed to send: %v", err)
        }
        time.Sleep(100 * time.Millisecond)
    }

    log.Println("All orders sent!")
}

Kafka 消费者:处理消息

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

type OrderEvent struct {
    OrderID   string    `json:"order_id"`
    UserID    string    `json:"user_id"`
    ProductID string    `json:"product_id"`
    Amount    float64   `json:"amount"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
}

// KafkaConsumer 封装 Kafka 消费者
type KafkaConsumer struct {
    reader *kafka.Reader
}

func NewKafkaConsumer(brokers []string, topic, groupID string) *KafkaConsumer {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        brokers,
        Topic:          topic,
        GroupID:        groupID,
        MinBytes:       10e3,          // 最小批量大小 10KB
        MaxBytes:       10e6,          // 最大批量大小 10MB
        CommitInterval: time.Second,   // 每秒提交一次偏移量
        StartOffset:    kafka.LastOffset, // 从最新消息开始消费
        // 生产环境:从最早消息开始,避免丢失
        // StartOffset: kafka.FirstOffset,
    })

    return &KafkaConsumer{reader: r}
}

func (c *KafkaConsumer) Consume(ctx context.Context, handler func(OrderEvent) error) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return ctx.Err()
            }
            log.Printf("Error fetching message: %v", err)
            continue
        }

        var event OrderEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("Failed to unmarshal message: %v", err)
            // 提交偏移量,跳过坏消息(生产环境应发送到死信队列)
            c.reader.CommitMessages(ctx, msg)
            continue
        }

        // 处理消息
        log.Printf("📬 Processing order: %s (partition: %d, offset: %d)",
            event.OrderID, msg.Partition, msg.Offset)

        if err := handler(event); err != nil {
            log.Printf("Failed to process order %s: %v", event.OrderID, err)
            // 重试逻辑或发送到死信队列
            // 这里先不提交偏移量,下次会重新消费
            continue
        }

        // 处理成功,提交偏移量
        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            log.Printf("Failed to commit offset: %v", err)
        }
    }
}

func (c *KafkaConsumer) Close() error {
    return c.reader.Close()
}

func main() {
    consumer := NewKafkaConsumer(
        []string{"localhost:9092"},
        "orders",
        "inventory-service", // 消费者组 ID
    )
    defer consumer.Close()

    ctx := context.Background()

    log.Println("Inventory service started, waiting for orders...")

    err := consumer.Consume(ctx, func(event OrderEvent) error {
        log.Printf("📦 Deducting stock for product %s (order: %s)",
            event.ProductID, event.OrderID)

        // 模拟库存扣减
        time.Sleep(50 * time.Millisecond)

        log.Printf("✅ Stock deducted for order %s", event.OrderID)
        return nil
    })

    if err != nil {
        log.Fatalf("Consumer error: %v", err)
    }
}

多消费者组并行消费

Kafka 的一个强大特性是多个消费者组可以独立消费同一个 Topic:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 优雅关闭
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    var wg sync.WaitGroup

    // 启动库存扣减消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer := NewKafkaConsumer(
            []string{"localhost:9092"},
            "orders",
            "inventory-group", // 独立的消费者组
        )
        defer consumer.Close()

        consumer.Consume(ctx, func(event OrderEvent) error {
            log.Printf("[Inventory] Processing order %s", event.OrderID)
            return deductStock(event)
        })
    }()

    // 启动支付消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer := NewKafkaConsumer(
            []string{"localhost:9092"},
            "orders",
            "payment-group", // 不同的消费者组
        )
        defer consumer.Close()

        consumer.Consume(ctx, func(event OrderEvent) error {
            log.Printf("[Payment] Processing order %s", event.OrderID)
            return processPayment(event)
        })
    }()

    // 启动通知消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer := NewKafkaConsumer(
            []string{"localhost:9092"},
            "orders",
            "notification-group",
        )
        defer consumer.Close()

        consumer.Consume(ctx, func(event OrderEvent) error {
            log.Printf("[Notification] Sending confirmation for order %s", event.OrderID)
            return sendNotification(event)
        })
    }()

    // 等待关闭信号
    <-sigChan
    log.Println("Shutting down consumers...")
    cancel()
    wg.Wait()
    log.Println("All consumers stopped")
}

Kafka 事件溯源模式

Kafka 的消息是持久化的,支持重新消费。这使得事件溯源(Event Sourcing)成为可能——你的系统状态可以通过重放所有事件来重建。

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// AccountEvent 账户事件
type AccountEvent struct {
    EventID   string    `json:"event_id"`
    AccountID string    `json:"account_id"`
    Type      string    `json:"type"` // deposit, withdrawal, transfer
    Amount    float64   `json:"amount"`
    Timestamp time.Time `json:"timestamp"`
}

// AccountState 账户状态(通过事件重建)
type AccountState struct {
    AccountID string  `json:"account_id"`
    Balance   float64 `json:"balance"`
    Version   int     `json:"version"` // 事件版本号
}

// EventStore 基于 Kafka 的事件存储
type EventStore struct {
    writer *kafka.Writer
}

func NewEventStore(brokers []string) *EventStore {
    return &EventStore{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        "account-events",
            Balancer:     &kafka.Hash{}, // 按 key 哈希分区
            RequiredAcks: kafka.RequireAll,
        },
    }
}

func (es *EventStore) Append(ctx context.Context, event AccountEvent) error {
    data, _ := json.Marshal(event)
    return es.writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.AccountID),
        Value: data,
    })
}

// RebuildState 从事件流重建账户状态
func (es *EventStore) RebuildState(ctx context.Context, brokers []string, accountID string) (*AccountState, error) {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     brokers,
        Topic:       "account-events",
        Partition:   0, // 简化示例,实际应根据分区策略确定
        StartOffset: kafka.FirstOffset, // 从第一条消息开始
    })
    defer reader.Close()

    state := &AccountState{
        AccountID: accountID,
        Balance:   0,
    }

    for {
        msg, err := reader.ReadMessage(ctx)
        if err != nil {
            break // 读完所有消息
        }

        var event AccountEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            continue
        }

        // 只处理目标账户的事件
        if event.AccountID != accountID {
            continue
        }

        // 应用事件到状态
        switch event.Type {
        case "deposit":
            state.Balance += event.Amount
        case "withdrawal":
            state.Balance -= event.Amount
        }
        state.Version++

        log.Printf("Applied event: %s %s %.2f (balance: %.2f)",
            event.Type, event.AccountID, event.Amount, state.Balance)
    }

    return state, nil
}

func main() {
    ctx := context.Background()
    brokers := []string{"localhost:9092"}

    store := NewEventStore(brokers)

    // 记录一系列账户事件
    events := []AccountEvent{
        {EventID: "evt-1", AccountID: "acc-001", Type: "deposit", Amount: 1000.00, Timestamp: time.Now()},
        {EventID: "evt-2", AccountID: "acc-001", Type: "withdrawal", Amount: 200.00, Timestamp: time.Now()},
        {EventID: "evt-3", AccountID: "acc-001", Type: "deposit", Amount: 500.00, Timestamp: time.Now()},
        {EventID: "evt-4", AccountID: "acc-001", Type: "withdrawal", Amount: 100.00, Timestamp: time.Now()},
    }

    for _, event := range events {
        if err := store.Append(ctx, event); err != nil {
            log.Printf("Failed to append event: %v", err)
        }
    }

    // 等待消息写入完成
    time.Sleep(2 * time.Second)

    // 重建账户状态
    state, err := store.RebuildState(ctx, brokers, "acc-001")
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("\nRebuilt state: %+v\n", state)
    // 输出:Balance: 1200.00, Version: 4
    // (1000 - 200 + 500 - 100 = 1200)
}

RabbitMQ:灵活的消息中间件

如果说 Kafka 是一个高性能的日志流平台,那么 RabbitMQ 就是一个功能丰富的消息路由引擎。RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议,提供了灵活的消息路由、多种交换机类型和完善的消息确认机制。

AMQP 协议核心概念

                           ┌──────────────────────────────┐
                           │        RabbitMQ Broker        │
                           │                               │
Producer ──消息──> Exchange ──路由──> Queue A ──> Consumer 1│
                    │                   Queue B ──> Consumer 2│
                    │                               │
              Binding Key                     Ack/Nack
                           └──────────────────────────────┘
  • Exchange(交换机):接收消息并根据规则路由到队列
  • Queue(队列):存储消息直到被消费者消费
  • Binding(绑定):Exchange 和 Queue 之间的关联规则
  • Routing Key(路由键):消息携带的路由标识

RabbitMQ 的四种交换机

Direct Exchange:  routing_key = "order.pay" ──> 精确匹配绑定键
Fanout Exchange:  忽略 routing_key ──> 广播到所有绑定的队列
Topic Exchange:   routing_key = "order.*" ──> 通配符匹配
Headers Exchange: 根据消息头属性路由(很少使用)

安装和启动 RabbitMQ

# Docker 启动(带管理界面)
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

# 管理界面:http://localhost:15672
# 用户名/密码:guest/guest

RabbitMQ 生产者

Go 中常用的 RabbitMQ 客户端是 amqp091-go(官方维护的 AMQP 客户端)。

go get github.com/rabbitmq/amqp091-go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

// RabbitProducer RabbitMQ 生产者
type RabbitProducer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitProducer(url string) (*RabbitProducer, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, fmt.Errorf("failed to connect: %w", err)
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, fmt.Errorf("failed to open channel: %w", err)
    }

    return &RabbitProducer{conn: conn, channel: ch}, nil
}

// DeclareExchange 声明交换机
func (p *RabbitProducer) DeclareExchange(name, kind string) error {
    return p.channel.ExchangeDeclare(
        name,    // 交换机名称
        kind,    // 类型:direct, fanout, topic, headers
        true,    // durable: 持久化
        false,   // auto-deleted
        false,   // internal
        false,   // no-wait
        nil,     // arguments
    )
}

// Publish 发送消息
func (p *RabbitProducer) Publish(ctx context.Context, exchange, routingKey string, body interface{}) error {
    data, err := json.Marshal(body)
    if err != nil {
        return err
    }

    return p.channel.PublishWithContext(ctx,
        exchange,   // 交换机
        routingKey, // 路由键
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         data,
            DeliveryMode: amqp.Persistent, // 持久化消息
            Timestamp:    time.Now(),
            MessageId:    fmt.Sprintf("msg-%d", time.Now().UnixNano()),
            Headers: amqp.Table{
                "producer":    "order-service",
                "retry-count": int32(0),
            },
        },
    )
}

func (p *RabbitProducer) Close() {
    p.channel.Close()
    p.conn.Close()
}

func main() {
    producer, err := NewRabbitProducer("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    ctx := context.Background()

    // 声明交换机
    producer.DeclareExchange("orders", "topic")

    // 发送不同类型的订单事件
    events := []struct {
        RoutingKey string
        Body       map[string]interface{}
    }{
        {
            RoutingKey: "order.created",
            Body:       map[string]interface{}{"order_id": "ORD-001", "amount": 99.9},
        },
        {
            RoutingKey: "order.paid",
            Body:       map[string]interface{}{"order_id": "ORD-001", "payment_id": "PAY-001"},
        },
        {
            RoutingKey: "order.shipped",
            Body:       map[string]interface{}{"order_id": "ORD-001", "tracking": "SF1234567"},
        },
    }

    for _, event := range events {
        if err := producer.Publish(ctx, "orders", event.RoutingKey, event.Body); err != nil {
            log.Printf("Failed to publish: %v", err)
        } else {
            log.Printf("📨 Published: %s", event.RoutingKey)
        }
    }
}

RabbitMQ 消费者

package main

import (
    "encoding/json"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

// RabbitConsumer RabbitMQ 消费者
type RabbitConsumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitConsumer(url string) (*RabbitConsumer, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, err
    }

    // 设置预取数量:一次只给消费者一条未确认的消息
    ch.Qos(1, 0, false)

    return &RabbitConsumer{conn: conn, channel: ch}, nil
}

// Consume 开始消费
func (c *RabbitConsumer) Consume(queueName string, handler func([]byte) error) error {
    msgs, err := c.channel.Consume(
        queueName, // 队列名
        "",        // 消费者标签
        false,     // auto-ack: 手动确认
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        return err
    }

    for msg := range msgs {
        log.Printf("📬 Received message from %s: %s", queueName, msg.RoutingKey)

        if err := handler(msg.Body); err != nil {
            log.Printf("❌ Processing failed: %v", err)
            // 拒绝消息,重新入队
            msg.Nack(false, true)
            continue
        }

        // 确认消息
        msg.Ack(false)
    }

    return nil
}

func (c *RabbitConsumer) Close() {
    c.channel.Close()
    c.conn.Close()
}

func main() {
    consumer, err := NewRabbitConsumer("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // 声明队列并绑定到交换机
    queue, err := consumer.channel.QueueDeclare(
        "inventory-queue", // 队列名
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    // 绑定到 orders 交换机,只接收 order.created 事件
    consumer.channel.QueueBind(
        queue.Name,
        "order.created", // 只接收订单创建事件
        "orders",        // 交换机
        false,
        nil,
    )

    log.Println("Inventory consumer waiting for messages...")

    consumer.Consume(queue.Name, func(body []byte) error {
        var event map[string]interface{}
        json.Unmarshal(body, &event)

        log.Printf("📦 Deducting stock for order: %v", event["order_id"])
        time.Sleep(100 * time.Millisecond) // 模拟处理
        log.Printf("✅ Stock deducted successfully")

        return nil
    })
}

消息模式实战

工作队列模式(Work Queues)

多个 Worker 竞争消费同一个队列,实现任务的分布式处理:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "os"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func startWorker(id int, url string) {
    conn, err := amqp.Dial(url)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // 公平调度:一次只分发一条消息
    ch.Qos(1, 0, false)

    msgs, err := ch.Consume("task-queue", "", false, false, false, false, nil)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Worker %d started", id)

    for msg := range msgs {
        task := string(msg.Body)
        log.Printf("Worker %d processing: %s", id, task)

        // 模拟不同任务的处理时间
        duration := time.Duration(rand.Intn(5)+1) * time.Second
        time.Sleep(duration)

        log.Printf("Worker %d done: %s (took %v)", id, task, duration)
        msg.Ack(false)
    }
}

func main() {
    url := "amqp://guest:guest@localhost:5672/"

    if len(os.Args) > 1 && os.Args[1] == "producer" {
        // 生产者:发送任务
        conn, _ := amqp.Dial(url)
        defer conn.Close()
        ch, _ := conn.Channel()
        defer ch.Close()

        ch.QueueDeclare("task-queue", true, false, false, false, nil)

        for i := 1; i <= 20; i++ {
            task := fmt.Sprintf("Task #%d", i)
            ch.PublishWithContext(context.Background(), "", "task-queue", false, false,
                amqp.Publishing{
                    Body:         []byte(task),
                    DeliveryMode: amqp.Persistent,
                })
            log.Printf("Sent: %s", task)
            time.Sleep(500 * time.Millisecond)
        }
    } else {
        // 消费者:作为 Worker 运行
        workerID := rand.Intn(100)
        startWorker(workerID, url)
    }
}

发布/订阅模式(Pub/Sub)

使用 Fanout 交换机将消息广播给所有订阅者:

package main

import (
    "context"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

// Publisher 发布者
func publishNews(url string) {
    conn, _ := amqp.Dial(url)
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()

    // 声明 Fanout 交换机
    ch.ExchangeDeclare("news", "fanout", true, false, false, false, nil)

    headlines := []string{
        "Breaking: Go 1.23 released with major improvements",
        "Tech: Kubernetes 2.0 announces new architecture",
        "Sports: World Cup 2026 venues confirmed",
    }

    for _, headline := range headlines {
        ch.PublishWithContext(context.Background(), "news", "", false, false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(headline),
            })
        log.Printf("📰 Published: %s", headline)
        time.Sleep(time.Second)
    }
}

// Subscriber 订阅者
func subscribeNews(url, name string) {
    conn, _ := amqp.Dial(url)
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()

    ch.ExchangeDeclare("news", "fanout", true, false, false, false, nil)

    // 每个订阅者创建自己的独占队列
    q, _ := ch.QueueDeclare("", false, true, true, false, nil)

    // 绑定到 news 交换机
    ch.QueueBind(q.Name, "", "news", false, nil)

    msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)

    log.Printf("[%s] Subscribed to news...", name)
    for msg := range msgs {
        log.Printf("[%s] 📰 Received: %s", name, msg.Body)
    }
}

func main() {
    url := "amqp://guest:guest@localhost:5672/"

    // 启动多个订阅者(在不同终端运行)
    go subscribeNews(url, "Email Service")
    go subscribeNews(url, "Push Notification")
    go subscribeNews(url, "SMS Service")

    time.Sleep(2 * time.Second)

    // 发布新闻
    publishNews(url)

    select {} // 保持运行
}

Topic 路由模式

使用通配符实现灵活的消息路由:

package main

import (
    "context"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    url := "amqp://guest:guest@localhost:5672/"
    conn, _ := amqp.Dial(url)
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()

    // Topic 交换机
    ch.ExchangeDeclare("logs", "topic", true, false, false, false, nil)

    // 创建不同日志级别的队列
    bindings := []struct {
        Queue      string
        RoutingKey string
        Desc       string
    }{
        {"all-logs", "#", "接收所有日志"},                     // # 匹配任意
        {"error-logs", "*.error", "只接收错误日志"},           // *.error 匹配 app.error, db.error
        {"app-logs", "app.#", "接收应用相关的所有日志"},       // app.# 匹配 app.error, app.info 等
    }

    for _, b := range bindings {
        q, _ := ch.QueueDeclare(b.Queue, true, false, false, false, nil)
        ch.QueueBind(q.Name, b.RoutingKey, "logs", false, nil)
        log.Printf("📋 Queue %q bound to %q (%s)", b.Queue, b.RoutingKey, b.Desc)
    }

    // 发送不同级别的日志
    logs := []struct {
        RoutingKey string
        Message    string
    }{
        {"app.info", "Application started"},
        {"app.error", "Database connection failed"},
        {"db.warning", "Slow query detected"},
        {"db.error", "Query timeout"},
        {"auth.info", "User logged in"},
        {"auth.error", "Invalid token"},
    }

    for _, l := range logs {
        ch.PublishWithContext(context.Background(), "logs", l.RoutingKey, false, false,
            amqp.Publishing{Body: []byte(l.Message)})
        log.Printf("📨 [%s] %s", l.RoutingKey, l.Message)
    }

    time.Sleep(time.Second)

    // 检查各队列的消息数
    for _, name := range []string{"all-logs", "error-logs", "app-logs"} {
        q, _ := ch.QueueInspect(name)
        log.Printf("📊 Queue %q: %d messages", name, q.Messages)
    }
    // all-logs: 6 messages (所有)
    // error-logs: 2 messages (*.error)
    // app-logs: 3 messages (app.#)
}

错误处理与重试策略

在生产环境中,消息消费失败是家常便饭。网络抖动、数据库超时、第三方 API 限流……你需要一套健壮的错误处理和重试机制。

指数退避重试

package retry

import (
    "context"
    "fmt"
    "log"
    "math"
    "time"
)

type Config struct {
    MaxAttempts  int
    InitialDelay time.Duration
    MaxDelay     time.Duration
    Multiplier   float64
}

var DefaultConfig = Config{
    MaxAttempts:  5,
    InitialDelay: time.Second,
    MaxDelay:     30 * time.Second,
    Multiplier:   2.0,
}

func WithExponentialBackoff(ctx context.Context, cfg Config, fn func() error) error {
    var lastErr error
    delay := cfg.InitialDelay

    for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ {
        lastErr = fn()
        if lastErr == nil {
            if attempt > 1 {
                log.Printf("✅ Succeeded on attempt %d", attempt)
            }
            return nil
        }

        log.Printf("⚠️  Attempt %d/%d failed: %v", attempt, cfg.MaxAttempts, lastErr)

        if attempt == cfg.MaxAttempts {
            break
        }

        // 指数退避:delay * multiplier^(attempt-1)
        nextDelay := time.Duration(float64(delay) * math.Pow(cfg.Multiplier, float64(attempt-1)))
        if nextDelay > cfg.MaxDelay {
            nextDelay = cfg.MaxDelay
        }

        log.Printf("⏳ Retrying in %v...", nextDelay)

        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(nextDelay):
        }
    }

    return fmt.Errorf("all %d attempts failed, last error: %w", cfg.MaxAttempts, lastErr)
}

死信队列(Dead Letter Queue)

当消息多次重试都失败后,不应该无限循环重试,而应该发送到死信队列,等待人工处理。

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func setupDeadLetterQueue(ch *amqp.Channel) error {
    // 1. 创建死信交换机和队列
    ch.ExchangeDeclare("dlx", "direct", true, false, false, false, nil)

    dlq, err := ch.QueueDeclare("dead-letter-queue", true, false, false, false, nil)
    if err != nil {
        return err
    }
    ch.QueueBind(dlq.Name, "dead", "dlx", false, nil)

    // 2. 创建主队列,配置死信路由
    _, err = ch.QueueDeclare("order-queue", true, false, false, false, amqp.Table{
        "x-dead-letter-exchange":    "dlx",      // 死信交换机
        "x-dead-letter-routing-key": "dead",     // 死信路由键
        "x-message-ttl":             60000,      // 消息过期时间 60秒
        "x-max-length":              10000,      // 队列最大长度
    })
    if err != nil {
        return err
    }

    ch.QueueBind("order-queue", "order", "orders", false, nil)

    log.Println("✅ Dead letter queue setup complete")
    return nil
}

// DLQMessage 死信消息(包含原始消息和错误信息)
type DLQMessage struct {
    OriginalBody  string    `json:"original_body"`
    Error         string    `json:"error"`
    RetryCount    int       `json:"retry_count"`
    FailedAt      time.Time `json:"failed_at"`
    Queue         string    `json:"queue"`
    RoutingKey    string    `json:"routing_key"`
}

func processWithRetry(ch *amqp.Channel) {
    msgs, _ := ch.Consume("order-queue", "", false, false, false, false, nil)

    for msg := range msgs {
        // 获取重试次数(从消息头中)
        retryCount := 0
        if count, ok := msg.Headers["x-retry-count"]; ok {
            retryCount = int(count.(int32))
        }

        err := processOrder(msg.Body)

        if err != nil {
            if retryCount < 3 {
                // 重试:重新发布消息,增加重试计数
                log.Printf("⚠️  Retry %d/3 for message: %v", retryCount+1, err)

                ch.PublishWithContext(context.Background(),
                    "", "order-queue", false, false,
                    amqp.Publishing{
                        Body: msg.Body,
                        Headers: amqp.Table{
                            "x-retry-count": int32(retryCount + 1),
                        },
                        DeliveryMode: amqp.Persistent,
                    })
                msg.Ack(false)
            } else {
                // 超过最大重试次数:拒绝消息,自动进入死信队列
                log.Printf("❌ Max retries exceeded, sending to DLQ: %v", err)
                msg.Nack(false, false) // requeue=false,消息进入 DLQ
            }
            continue
        }

        msg.Ack(false)
    }
}

// DLQConsumer 死信队列消费者(人工处理或告警)
func consumeDLQ(ch *amqp.Channel) {
    msgs, _ := ch.Consume("dead-letter-queue", "", false, false, false, false, nil)

    for msg := range msgs {
        log.Printf("🚨 Dead letter received: %s", msg.Body)

        // 发送告警通知
        // sendAlert(msg.Body)

        // 保存到数据库供人工处理
        // saveFailedMessage(msg.Body)

        msg.Ack(false)
    }
}

func processOrder(body []byte) error {
    // 模拟处理,有一定概率失败
    var order map[string]interface{}
    json.Unmarshal(body, &order)

    // 模拟错误
    if order["amount"] == nil {
        return fmt.Errorf("missing required field: amount")
    }

    log.Printf("✅ Order processed: %v", order["order_id"])
    return nil
}

func main() {
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()
    ch, _ := conn.Channel()
    defer ch.Close()

    setupDeadLetterQueue(ch)

    // 启动主消费者和死信消费者
    go processWithRetry(ch)
    go consumeDLQ(ch)

    select {}
}

消息可靠性:确保不丢不重

在生产环境中,消息不能丢,也不能重复处理。这需要从三个层面保证。

1. 发送端确认(Publisher Confirms)

func publishWithConfirm(ch *amqp.Channel, queue string, body []byte) error {
    // 开启确认模式
    if err := ch.Confirm(false); err != nil {
        return err
    }

    // 注册确认回调
    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

    err := ch.PublishWithContext(context.Background(), "", queue, false, false,
        amqp.Publishing{
            Body:         body,
            DeliveryMode: amqp.Persistent,
        })
    if err != nil {
        return err
    }

    // 等待 Broker 确认
    select {
    case confirmed := <-confirms:
        if confirmed.Ack {
            log.Printf("✅ Message confirmed by broker")
        } else {
            return fmt.Errorf("message nacked by broker")
        }
    case <-time.After(5 * time.Second):
        return fmt.Errorf("confirmation timeout")
    }

    return nil
}

2. 消费端手动确认

// 手动确认:处理完成后才确认
msgs, _ := ch.Consume("queue", "", false, false, false, false, nil)
for msg := range msgs {
    if err := process(msg.Body); err != nil {
        msg.Nack(false, true) // 处理失败,重新入队
    } else {
        msg.Ack(false) // 处理成功,确认
    }
}

3. 幂等性处理

即使消息被重复投递,处理结果也应该一致:

package main

import (
    "context"
    "database/sql"
    "log"

    _ "github.com/lib/pq"
)

type IdempotentProcessor struct {
    db *sql.DB
}

func NewIdempotentProcessor(db *sql.DB) *IdempotentProcessor {
    // 创建幂等性记录表
    db.Exec(`
        CREATE TABLE IF NOT EXISTS processed_messages (
            message_id VARCHAR(255) PRIMARY KEY,
            processed_at TIMESTAMP DEFAULT NOW()
        )
    `)
    return &IdempotentProcessor{db: db}
}

func (p *IdempotentProcessor) Process(ctx context.Context, messageID string, fn func() error) error {
    // 检查是否已处理
    var exists bool
    err := p.db.QueryRowContext(ctx,
        "SELECT EXISTS(SELECT 1 FROM processed_messages WHERE message_id = $1)",
        messageID).Scan(&exists)
    if err != nil {
        return err
    }

    if exists {
        log.Printf("⏭️  Message %s already processed, skipping", messageID)
        return nil
    }

    // 执行处理
    if err := fn(); err != nil {
        return err
    }

    // 记录已处理
    _, err = p.db.ExecContext(ctx,
        "INSERT INTO processed_messages (message_id) VALUES ($1)",
        messageID)
    if err != nil {
        log.Printf("⚠️  Failed to record message processing: %v", err)
    }

    return nil
}

Kafka vs RabbitMQ:如何选择?

这是一个经常被问到的问题。让我给你一个实用的决策框架:

选择 Kafka 的场景:

  • 需要消息持久化和重放能力(事件溯源、数据分析)
  • 超高吞吐量需求(每秒百万级消息)
  • 消息需要被多个系统独立消费
  • 日志收集和事件流处理
  • 需要保留消息历史(按时间或容量)

选择 RabbitMQ 的场景:

  • 需要灵活的消息路由(Topic、Direct、Fanout)
  • 低延迟需求(微秒级)
  • 任务队列和工作分发
  • 需要消息优先级
  • 需要延迟消息(延迟队列)
  • 复杂的消息确认和死信处理

两者都用的场景:

很多公司同时使用两者。Kafka 作为事件总线处理数据流,RabbitMQ 作为任务队列处理业务逻辑。例如:

用户下单 → Kafka (事件流) → 数据分析平台
                          → 推荐系统
                          → 审计日志

用户下单 → RabbitMQ (任务队列) → 库存扣减 Worker
                               → 支付处理 Worker
                               → 邮件通知 Worker

总结

今天我们深入探讨了 Go 语言中的消息队列实战:

  • 消息队列概念:异步处理、流量削峰、系统解耦
  • Kafka 生产者:批量发送、消息压缩、分区策略
  • Kafka 消费者:消费者组、偏移量管理、手动提交
  • Kafka 事件溯源:通过重放事件重建系统状态
  • RabbitMQ 生产者:AMQP 协议、交换机声明、持久化消息
  • RabbitMQ 消费者:手动确认、预取控制、公平调度
  • 消息模式:工作队列、发布/订阅、Topic 路由
  • 错误处理:指数退避重试、死信队列
  • 消息可靠性:发送确认、消费确认、幂等性处理

消息队列是分布式系统的血管,选对工具、用好模式,你的系统才能在流量洪峰面前从容不迫。

现在就去试试吧!启动一个 Docker 容器,写一个 Producer 和 Consumer,感受消息在系统间流动的美妙。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页