事件驱动架构与领域事件:构建松耦合的响应式系统

深入讲解事件驱动架构的设计原则与实现模式,涵盖领域事件、事件存储、事件溯源、CQRS、事件总线等核心概念,提供Node.js和Java的完整实战代码。

事件驱动架构概述

事件驱动架构(EDA)是一种通过事件的产生、检测和消费来实现系统组件间松耦合通信的架构风格。

事件驱动架构优势:
┌─────────────────────────────────────────┐
│ ✓ 松耦合:生产者不知道消费者是谁        │
│ ✓ 可扩展:新增消费者不影响现有系统      │
│ ✓ 审计追踪:完整的事件历史记录          │
│ ✓ 时间解耦:异步处理,提高响应速度      │
│ ✓ 弹性:失败可重试,不影响主流程        │
└─────────────────────────────────────────┘

领域事件设计

事件定义

// src/domain/events/baseEvent.ts
export interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  timestamp: Date;
  version: number;
  metadata?: Record<string, any>;
}

// src/domain/events/orderEvents.ts
export class OrderCreatedEvent implements DomainEvent {
  readonly eventId: string;
  readonly eventType = 'OrderCreated';
  readonly aggregateType = 'Order';
  readonly timestamp: Date;
  readonly version = 1;
  
  constructor(
    public readonly aggregateId: string,
    public readonly userId: string,
    public readonly items: OrderItem[],
    public readonly totalAmount: number,
    public readonly shippingAddress: Address,
    metadata?: Record<string, any>
  ) {
    this.eventId = generateUUID();
    this.timestamp = new Date();
    this.metadata = metadata;
  }
}

export class OrderConfirmedEvent implements DomainEvent {
  readonly eventId: string;
  readonly eventType = 'OrderConfirmed';
  readonly aggregateType = 'Order';
  readonly timestamp: Date;
  readonly version = 1;
  
  constructor(
    public readonly aggregateId: string,
    public readonly confirmedAt: Date,
    public readonly estimatedDelivery: Date,
    metadata?: Record<string, any>
  ) {
    this.eventId = generateUUID();
    this.timestamp = new Date();
    this.metadata = metadata;
  }
}

export class OrderCancelledEvent implements DomainEvent {
  readonly eventId: string;
  readonly eventType = 'OrderCancelled';
  readonly aggregateType = 'Order';
  readonly timestamp: Date;
  readonly version = 1;
  
  constructor(
    public readonly aggregateId: string,
    public readonly reason: string,
    public readonly cancelledBy: string,
    metadata?: Record<string, any>
  ) {
    this.eventId = generateUUID();
    this.timestamp = new Date();
    this.metadata = metadata;
  }
}

export class OrderShippedEvent implements DomainEvent {
  readonly eventId: string;
  readonly eventType = 'OrderShipped';
  readonly aggregateType = 'Order';
  readonly timestamp: Date;
  readonly version = 1;
  
  constructor(
    public readonly aggregateId: string,
    public readonly trackingNumber: string,
    public readonly carrier: string,
    public readonly shippedAt: Date,
    metadata?: Record<string, any>
  ) {
    this.eventId = generateUUID();
    this.timestamp = new Date();
    this.metadata = metadata;
  }
}

聚合根发布事件

// src/domain/entities/Order.ts
import { DomainEvent } from '../events/baseEvent';
import { OrderCreatedEvent, OrderConfirmedEvent, OrderCancelledEvent } from '../events/orderEvents';

export class Order {
  private domainEvents: DomainEvent[] = [];
  
  constructor(
    public readonly id: string,
    public userId: string,
    public items: OrderItem[],
    public totalAmount: number,
    public shippingAddress: Address,
    public status: OrderStatus = 'PENDING',
    public createdAt: Date = new Date()
  ) {}
  
  static create(
    userId: string,
    items: OrderItem[],
    shippingAddress: Address
  ): Order {
    const totalAmount = items.reduce(
      (sum, item) => sum + item.price * item.quantity,
      0
    );
    
    const order = new Order(
      generateUUID(),
      userId,
      items,
      totalAmount,
      shippingAddress
    );
    
    // 发布领域事件
    order.addDomainEvent(
      new OrderCreatedEvent(
        order.id,
        userId,
        items,
        totalAmount,
        shippingAddress,
        { correlationId: generateCorrelationId() }
      )
    );
    
    return order;
  }
  
  confirm(estimatedDelivery: Date): void {
    if (this.status !== 'PENDING') {
      throw new BusinessRuleError('Only pending orders can be confirmed');
    }
    
    this.status = 'CONFIRMED';
    
    this.addDomainEvent(
      new OrderConfirmedEvent(
        this.id,
        new Date(),
        estimatedDelivery
      )
    );
  }
  
  cancel(reason: string, cancelledBy: string): void {
    if (this.status === 'SHIPPED') {
      throw new BusinessRuleError('Cannot cancel shipped order');
    }
    
    this.status = 'CANCELLED';
    
    this.addDomainEvent(
      new OrderCancelledEvent(
        this.id,
        reason,
        cancelledBy
      )
    );
  }
  
  // 获取并清除领域事件
  pullDomainEvents(): DomainEvent[] {
    const events = [...this.domainEvents];
    this.domainEvents = [];
    return events;
  }
  
  private addDomainEvent(event: DomainEvent): void {
    this.domainEvents.push(event);
  }
}

// src/application/services/OrderApplicationService.ts
export class OrderApplicationService {
  constructor(
    private orderRepository: OrderRepository,
    private eventPublisher: EventPublisher
  ) {}
  
  async createOrder(command: CreateOrderCommand): Promise<string> {
    // 1. 创建聚合根
    const order = Order.create(
      command.userId,
      command.items,
      command.shippingAddress
    );
    
    // 2. 保存到数据库(事务)
    await this.orderRepository.save(order);
    
    // 3. 发布领域事件
    const events = order.pullDomainEvents();
    await this.eventPublisher.publishAll(events);
    
    return order.id;
  }
  
  async confirmOrder(orderId: string, estimatedDelivery: Date): Promise<void> {
    const order = await this.orderRepository.findById(orderId);
    if (!order) {
      throw new NotFoundError('Order', orderId);
    }
    
    order.confirm(estimatedDelivery);
    await this.orderRepository.save(order);
    
    const events = order.pullDomainEvents();
    await this.eventPublisher.publishAll(events);
  }
}

事件总线实现

内存事件总线

// src/infrastructure/events/InMemoryEventBus.ts
import { EventEmitter } from 'events';
import { DomainEvent } from '../../domain/events/baseEvent';

type EventHandler = (event: DomainEvent) => Promise<void>;

export class InMemoryEventBus {
  private emitter: EventEmitter;
  
  constructor() {
    this.emitter = new EventEmitter();
    this.emitter.setMaxListeners(100); // 增加监听器数量限制
  }
  
  subscribe(eventType: string, handler: EventHandler): void {
    this.emitter.on(eventType, async (event: DomainEvent) => {
      try {
        await handler(event);
      } catch (error) {
        console.error(`Error handling event ${eventType}:`, error);
        // 可以发送到死信队列
      }
    });
  }
  
  async publish(event: DomainEvent): Promise<void> {
    this.emitter.emit(event.eventType, event);
  }
  
  async publishAll(events: DomainEvent[]): Promise<void> {
    for (const event of events) {
      await this.publish(event);
    }
  }
}

// 使用示例
const eventBus = new InMemoryEventBus();

// 订阅订单创建事件
eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
  await inventoryService.reserveStock(event.aggregateId, event.items);
});

eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
  await notificationService.sendOrderConfirmation(event.userId, event.aggregateId);
});

// 订阅订单确认事件
eventBus.subscribe('OrderConfirmed', async (event: OrderConfirmedEvent) => {
  await warehouseService.prepareShipment(event.aggregateId);
});

Kafka事件总线

// src/infrastructure/events/KafkaEventBus.ts
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
import { DomainEvent } from '../../domain/events/baseEvent';

export class KafkaEventBus {
  private producer: Producer;
  private consumers: Map<string, Consumer> = new Map();
  private topic: string;
  
  constructor(
    private brokers: string[],
    private clientId: string,
    topic: string = 'domain-events'
  ) {
    const kafka = new Kafka({
      clientId,
      brokers
    });
    
    this.producer = kafka.producer();
    this.topic = topic;
  }
  
  async connect(): Promise<void> {
    await this.producer.connect();
    console.log('Kafka producer connected');
  }
  
  async publish(event: DomainEvent): Promise<void> {
    await this.producer.send({
      topic: this.topic,
      messages: [
        {
          key: event.aggregateId,
          value: JSON.stringify(event),
          headers: {
            eventType: event.eventType,
            aggregateType: event.aggregateType,
            timestamp: event.timestamp.toISOString()
          }
        }
      ]
    });
    
    console.log(`Published event: ${event.eventType} for ${event.aggregateId}`);
  }
  
  async subscribe(
    eventType: string,
    groupId: string,
    handler: (event: DomainEvent) => Promise<void>
  ): Promise<void> {
    const kafka = new Kafka({
      clientId: this.clientId,
      brokers: this.brokers
    });
    
    const consumer = kafka.consumer({ groupId });
    await consumer.connect();
    await consumer.subscribe({ topic: this.topic, fromBeginning: false });
    
    await consumer.run({
      eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
        const eventTypeHeader = message.headers?.eventType?.toString();
        
        if (eventTypeHeader === eventType) {
          const event = JSON.parse(message.value?.toString() || '{}');
          
          try {
            await handler(event);
          } catch (error) {
            console.error(`Error handling event ${eventType}:`, error);
            // 发送到死信队列或重试
            throw error;
          }
        }
      }
    });
    
    this.consumers.set(`${eventType}-${groupId}`, consumer);
    console.log(`Subscribed to ${eventType} with group ${groupId}`);
  }
  
  async disconnect(): Promise<void> {
    await this.producer.disconnect();
    
    for (const consumer of this.consumers.values()) {
      await consumer.disconnect();
    }
    
    console.log('Kafka disconnected');
  }
}

// 使用示例
const eventBus = new KafkaEventBus(
  ['kafka1:9092', 'kafka2:9092'],
  'order-service'
);

await eventBus.connect();

// 发布事件
await eventBus.publish(orderCreatedEvent);

// 订阅事件
await eventBus.subscribe(
  'OrderCreated',
  'inventory-service-group',
  async (event) => {
    await inventoryService.reserveStock(event.aggregateId, event.items);
  }
);

事件溯源(Event Sourcing)

事件存储

// src/infrastructure/persistence/EventStore.ts
import { Pool } from 'pg';
import { DomainEvent } from '../../domain/events/baseEvent';

export class PostgresEventStore {
  constructor(private pool: Pool) {
    this.initializeSchema();
  }
  
  private async initializeSchema(): Promise<void> {
    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS events (
        event_id UUID PRIMARY KEY,
        event_type VARCHAR(255) NOT NULL,
        aggregate_id UUID NOT NULL,
        aggregate_type VARCHAR(255) NOT NULL,
        event_data JSONB NOT NULL,
        metadata JSONB,
        version INTEGER NOT NULL,
        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
        UNIQUE(aggregate_id, version)
      );
      
      CREATE INDEX IF NOT EXISTS idx_events_aggregate 
        ON events(aggregate_id, version);
      
      CREATE INDEX IF NOT EXISTS idx_events_type 
        ON events(event_type, created_at);
    `);
  }
  
  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    
    try {
      await client.query('BEGIN');
      
      // 乐观锁检查
      const currentVersion = await this.getAggregateVersion(client, aggregateId);
      
      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but got ${currentVersion}`
        );
      }
      
      // 保存事件
      for (const event of events) {
        await client.query(
          `INSERT INTO events 
           (event_id, event_type, aggregate_id, aggregate_type, 
            event_data, metadata, version, created_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.eventType,
            event.aggregateId,
            event.aggregateType,
            JSON.stringify(event),
            event.metadata ? JSON.stringify(event.metadata) : null,
            event.version,
            event.timestamp
          ]
        );
      }
      
      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
  
  async loadEvents(aggregateId: string): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT event_data FROM events 
       WHERE aggregate_id = $1 
       ORDER BY version ASC`,
      [aggregateId]
    );
    
    return result.rows.map(row => {
      const eventData = row.event_data;
      return this.deserializeEvent(eventData);
    });
  }
  
  private async getAggregateVersion(
    client: any,
    aggregateId: string
  ): Promise<number> {
    const result = await client.query(
      `SELECT MAX(version) as version FROM events WHERE aggregate_id = $1`,
      [aggregateId]
    );
    
    return result.rows[0]?.version || 0;
  }
  
  private deserializeEvent(eventData: any): DomainEvent {
    // 根据eventType反序列化为具体的事件类
    const eventMap = {
      'OrderCreated': OrderCreatedEvent,
      'OrderConfirmed': OrderConfirmedEvent,
      'OrderCancelled': OrderCancelledEvent,
      // ...
    };
    
    const EventClass = eventMap[eventData.eventType];
    if (!EventClass) {
      throw new Error(`Unknown event type: ${eventData.eventType}`);
    }
    
    return Object.assign(new EventClass(), eventData);
  }
}

// src/domain/aggregates/EventSourcedOrder.ts
export class EventSourcedOrder {
  private version: number = 0;
  
  constructor(
    public readonly id: string,
    public userId: string = '',
    public items: OrderItem[] = [],
    public totalAmount: number = 0,
    public status: OrderStatus = 'PENDING',
    private changes: DomainEvent[] = []
  ) {}
  
  // 从事件流重建状态
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.apply(event, false);
    }
  }
  
  // 创建新订单
  static create(
    userId: string,
    items: OrderItem[],
    shippingAddress: Address
  ): EventSourcedOrder {
    const order = new EventSourcedOrder(generateUUID());
    
    const totalAmount = items.reduce(
      (sum, item) => sum + item.price * item.quantity,
      0
    );
    
    const event = new OrderCreatedEvent(
      order.id,
      userId,
      items,
      totalAmount,
      shippingAddress
    );
    
    order.apply(event, true);
    
    return order;
  }
  
  confirm(estimatedDelivery: Date): void {
    if (this.status !== 'PENDING') {
      throw new BusinessRuleError('Only pending orders can be confirmed');
    }
    
    const event = new OrderConfirmedEvent(
      this.id,
      new Date(),
      estimatedDelivery
    );
    
    this.apply(event, true);
  }
  
  // 应用事件
  private apply(event: DomainEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        const createdEvent = event as OrderCreatedEvent;
        this.userId = createdEvent.userId;
        this.items = createdEvent.items;
        this.totalAmount = createdEvent.totalAmount;
        this.status = 'PENDING';
        break;
        
      case 'OrderConfirmed':
        this.status = 'CONFIRMED';
        break;
        
      case 'OrderCancelled':
        this.status = 'CANCELLED';
        break;
    }
    
    this.version++;
    
    if (isNew) {
      this.changes.push(event);
    }
  }
  
  // 获取未保存的变更
  getChanges(): DomainEvent[] {
    return this.changes;
  }
  
  clearChanges(): void {
    this.changes = [];
  }
}

// src/application/services/EventSourcedOrderService.ts
export class EventSourcedOrderService {
  constructor(
    private eventStore: PostgresEventStore,
    private eventPublisher: EventPublisher
  ) {}
  
  async createOrder(command: CreateOrderCommand): Promise<string> {
    const order = EventSourcedOrder.create(
      command.userId,
      command.items,
      command.shippingAddress
    );
    
    const changes = order.getChanges();
    
    await this.eventStore.saveEvents(order.id, changes, 0);
    await this.eventPublisher.publishAll(changes);
    
    order.clearChanges();
    
    return order.id;
  }
  
  async confirmOrder(orderId: string, estimatedDelivery: Date): Promise<void> {
    const events = await this.eventStore.loadEvents(orderId);
    
    if (events.length === 0) {
      throw new NotFoundError('Order', orderId);
    }
    
    const order = new EventSourcedOrder(orderId);
    order.loadFromHistory(events);
    
    const currentVersion = events.length;
    
    order.confirm(estimatedDelivery);
    
    const changes = order.getChanges();
    await this.eventStore.saveEvents(orderId, changes, currentVersion);
    await this.eventPublisher.publishAll(changes);
  }
}

CQRS(命令查询职责分离)

// src/application/commands/OrderCommands.ts
export interface Command {
  commandId: string;
  timestamp: Date;
}

export class CreateOrderCommand implements Command {
  readonly commandId = generateUUID();
  readonly timestamp = new Date();
  
  constructor(
    public readonly userId: string,
    public readonly items: OrderItem[],
    public readonly shippingAddress: Address
  ) {}
}

// src/application/queries/OrderQueries.ts
export interface OrderReadModel {
  id: string;
  userId: string;
  items: OrderItem[];
  totalAmount: number;
  status: string;
  createdAt: Date;
  customerName?: string;
  productNames?: string[];
}

export class GetOrderQuery {
  constructor(public readonly orderId: string) {}
}

export class GetUserOrdersQuery {
  constructor(
    public readonly userId: string,
    public readonly page: number = 1,
    public readonly pageSize: number = 20
  ) {}
}

// src/infrastructure/persistence/OrderReadModelProjector.ts
export class OrderReadModelProjector {
  constructor(private readDb: Pool) {
    this.initializeSchema();
  }
  
  private async initializeSchema(): Promise<void> {
    await this.readDb.query(`
      CREATE TABLE IF NOT EXISTS order_read_models (
        id UUID PRIMARY KEY,
        user_id UUID NOT NULL,
        items JSONB NOT NULL,
        total_amount DECIMAL NOT NULL,
        status VARCHAR(50) NOT NULL,
        customer_name VARCHAR(255),
        product_names TEXT[],
        created_at TIMESTAMP WITH TIME ZONE NOT NULL,
        updated_at TIMESTAMP WITH TIME ZONE NOT NULL
      );
      
      CREATE INDEX IF NOT EXISTS idx_orders_user 
        ON order_read_models(user_id, created_at DESC);
      
      CREATE INDEX IF NOT EXISTS idx_orders_status 
        ON order_read_models(status);
    `);
  }
  
  async project(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.handleOrderCreated(event as OrderCreatedEvent);
        break;
      case 'OrderConfirmed':
        await this.updateOrderStatus(event.aggregateId, 'CONFIRMED');
        break;
      case 'OrderCancelled':
        await this.updateOrderStatus(event.aggregateId, 'CANCELLED');
        break;
    }
  }
  
  private async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
    // 查询客户信息
    const customer = await this.readDb.query(
      'SELECT name FROM users WHERE id = $1',
      [event.userId]
    );
    
    // 查询产品名称
    const productIds = event.items.map(item => item.productId);
    const products = await this.readDb.query(
      'SELECT id, name FROM products WHERE id = ANY($1)',
      [productIds]
    );
    
    const productNames = products.rows.map(p => p.name);
    
    await this.readDb.query(
      `INSERT INTO order_read_models 
       (id, user_id, items, total_amount, status, customer_name, product_names, created_at, updated_at)
       VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $8)`,
      [
        event.aggregateId,
        event.userId,
        JSON.stringify(event.items),
        event.totalAmount,
        'PENDING',
        customer.rows[0]?.name || 'Unknown',
        productNames,
        event.timestamp
      ]
    );
  }
  
  private async updateOrderStatus(orderId: string, status: string): Promise<void> {
    await this.readDb.query(
      `UPDATE order_read_models 
       SET status = $1, updated_at = NOW() 
       WHERE id = $2`,
      [status, orderId]
    );
  }
}

// src/application/queries/OrderQueryService.ts
export class OrderQueryService {
  constructor(private readDb: Pool) {}
  
  async getOrder(query: GetOrderQuery): Promise<OrderReadModel | null> {
    const result = await this.readDb.query(
      'SELECT * FROM order_read_models WHERE id = $1',
      [query.orderId]
    );
    
    return result.rows[0] || null;
  }
  
  async getUserOrders(query: GetUserOrdersQuery): Promise<{
    orders: OrderReadModel[];
    total: number;
  }> {
    const offset = (query.page - 1) * query.pageSize;
    
    const [ordersResult, countResult] = await Promise.all([
      this.readDb.query(
        `SELECT * FROM order_read_models 
         WHERE user_id = $1 
         ORDER BY created_at DESC 
         LIMIT $2 OFFSET $3`,
        [query.userId, query.pageSize, offset]
      ),
      this.readDb.query(
        'SELECT COUNT(*) FROM order_read_models WHERE user_id = $1',
        [query.userId]
      )
    ]);
    
    return {
      orders: ordersResult.rows,
      total: parseInt(countResult.rows[0].count)
    };
  }
}

// 订阅事件并投影
const projector = new OrderReadModelProjector(readDb);

eventBus.subscribe('OrderCreated', 'read-model-projector', async (event) => {
  await projector.project(event);
});

eventBus.subscribe('OrderConfirmed', 'read-model-projector', async (event) => {
  await projector.project(event);
});

eventBus.subscribe('OrderCancelled', 'read-model-projector', async (event) => {
  await projector.project(event);
});

总结

事件驱动架构的核心价值:

  1. 领域事件:捕获业务中发生的重要事实,作为系统通信的基础
  2. 事件总线:实现组件间的松耦合通信,支持多个消费者
  3. 事件溯源:通过事件流重建状态,提供完整的审计追踪
  4. CQRS:分离命令和查询,优化读写性能

关键原则:

  • 事件是不可变的事实记录
  • 使用事件实现服务间的松耦合
  • 事件溯源提供强大的审计和调试能力
  • CQRS允许针对读写场景分别优化
  • 实现幂等性处理重复事件

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页