引言
在现代数据架构中,实时数据同步是核心需求。无论是构建搜索索引、缓存更新、还是数据仓库ETL,都需要可靠的数据变更捕获机制。CDC(Change Data Capture)技术通过监听数据库变更日志,实现了低延迟、高可靠的数据同步方案。
CDC技术原理
核心概念
CDC通过捕获数据库的变更日志(如MySQL的binlog、PostgreSQL的WAL),将INSERT、UPDATE、DELETE操作转换为事件流,供下游系统消费。
传统查询方式(Pull):
┌─────────┐ 定时查询 ┌─────────┐
│ 源数据库 │ ──────────────▶ │ 目标系统 │
└─────────┘ (延迟高、压力大) └─────────┘
CDC推送方式(Push):
┌─────────┐ binlog/WAL ┌─────────┐ 事件流 ┌─────────┐
│ 源数据库 │ ─────────────▶ │ CDC │ ─────────▶ │ 目标系统 │
└─────────┘ (实时、低延迟) └─────────┘ └─────────┘
技术对比
| 方案 | 延迟 | 对源库影响 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 定时查询 | 分钟级 | 高 | 低 | 离线ETL |
| 触发器 | 秒级 | 中 | 中 | 简单同步 |
| CDC | 毫秒级 | 低 | 高 | 实时同步 |
Debezium部署与配置
Docker Compose部署
# docker-compose.yml
version: '3.8'
services:
# Zookeeper(Kafka依赖)
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# Kafka消息队列
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Debezium Connect
connect:
image: debezium/connect:2.4
depends_on:
- kafka
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: debezium-connect
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# MySQL源数据库
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
volumes:
- ./mysql.cnf:/etc/mysql/conf.d/mysql.cnf
command:
- --server-id=1
- --log-bin=mysql-bin
- --binlog-format=ROW
- --binlog-row-image=FULL
MySQL配置
# mysql.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
gtid-mode=ON
enforce-gtid-consistency=ON
Debezium连接器配置
MySQL CDC连接器
// mysql-connector.json
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
// 数据库连接配置
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "1",
"database.server.name": "dbserver1",
// 数据库过滤
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders",
// Kafka配置
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
// 转换器配置
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
// 时间处理
"time.precision.mode": "connect",
"decimal.handling.mode": "double",
// 快照配置
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal"
}
}
部署连接器
# 创建连接器
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @mysql-connector.json
# 查看连接器状态
curl -s http://localhost:8083/connectors/mysql-connector/status | jq
# 查看生成的Topic
kafka-topics --bootstrap-server localhost:9092 --list
# 输出: dbserver1.inventory.customers, dbserver1.inventory.orders
数据消费与转换
Kafka消费者示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CDCConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cdc-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("dbserver1.inventory.customers"));
ObjectMapper mapper = new ObjectMapper();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
CDCEvent event = mapper.readValue(record.value(), CDCEvent.class);
processEvent(event);
} catch (Exception e) {
log.error("Failed to process event", e);
}
}
}
}
private static void processEvent(CDCEvent event) {
switch (event.getOp()) {
case "c": // Create
handleCreate(event.getAfter());
break;
case "u": // Update
handleUpdate(event.getBefore(), event.getAfter());
break;
case "d": // Delete
handleDelete(event.getBefore());
break;
}
}
}
CDC事件结构
{
"schema": { ... },
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "John",
"last_name": "Doe",
"email": "john@example.com"
},
"source": {
"version": "2.4.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1642680000000,
"db": "inventory",
"table": "customers",
"server_id": 1,
"file": "mysql-bin.000003",
"pos": 154
},
"op": "c",
"ts_ms": 1642680000123
}
}
实时同步到Elasticsearch
Kafka Connect Elasticsearch Sink
// elasticsearch-sink.json
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "dbserver1.inventory.customers",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
// 键值转换
"key.ignore": "false",
"schema.ignore": "true",
// 转换配置
"transforms": "unwrap,keyTransform",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.keyTransform.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.keyTransform.fields": "id",
// 重试配置
"max.retries": "5",
"retry.backoff.ms": "1000"
}
}
Go消费者直接写入ES
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"github.com/elastic/go-elasticsearch/v8"
"github.com/segmentio/kafka-go"
)
type CDCEvent struct {
Payload struct {
Before interface{} `json:"before"`
After interface{} `json:"after"`
Op string `json:"op"`
Source struct {
Table string `json:"table"`
} `json:"source"`
} `json:"payload"`
}
func main() {
// Elasticsearch客户端
es, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
})
if err != nil {
log.Fatalf("Failed to create ES client: %v", err)
}
// Kafka消费者
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "dbserver1.inventory.customers",
GroupID: "es-sync-group",
MinBytes: 10e3,
MaxBytes: 10e6,
})
defer reader.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 优雅关闭
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigterm
cancel()
}()
for {
select {
case <-ctx.Done():
return
default:
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Printf("Error reading message: %v", err)
continue
}
var event CDCEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Error unmarshaling: %v", err)
continue
}
switch event.Payload.Op {
case "c", "u":
// 创建或更新文档
if err := indexDocument(es, event.Payload.After); err != nil {
log.Printf("Error indexing: %v", err)
}
case "d":
// 删除文档
if err := deleteDocument(es, event.Payload.Before); err != nil {
log.Printf("Error deleting: %v", err)
}
}
}
}
}
Schema演进处理
Schema Registry集成
# 启用Schema Registry
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
兼容性策略
// Avro Schema示例
{
"type": "record",
"name": "Customer",
"namespace": "io.debezium.mysql.inventory",
"fields": [
{"name": "id", "type": "long"},
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "phone", "type": ["null", "string"], "default": null}
]
}
监控与告警
Prometheus指标
# prometheus.yml
scrape_configs:
- job_name: 'debezium-connect'
static_configs:
- targets: ['connect:9876']
metrics_path: '/metrics'
关键指标
# 连接器延迟
debezium_metrics_MilliSecondsBehindSource
# 快照进度
debezium_metrics_TotalNumberOfEventsSeen
# 错误计数
debezium_metrics_NumberOfErroneousEvents
# 告警规则
groups:
- name: debezium
rules:
- alert: HighCDCDelay
expr: debezium_metrics_MilliSecondsBehindSource > 60000
for: 5m
labels:
severity: warning
annotations:
summary: "CDC延迟超过1分钟"
故障恢复策略
自动重启
# 连接器失败后自动重启
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
# 任务级别重启
curl -X POST http://localhost:8083/connectors/mysql-connector/tasks/0/restart
偏移重置
# 重置消费偏移(从最新开始)
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# 重新创建连接器(会触发新的快照)
curl -i -X POST \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @mysql-connector.json
性能优化
批量处理
{
"config": {
// 批量提交配置
"snapshot.fetch.size": "10240",
"max.batch.size": "2048",
"max.queue.size": "8192",
// 并行处理
"tasks.max": "4",
// 压缩
"compression.type": "lz4"
}
}
过滤优化
{
"config": {
// 只同步特定列
"column.include.list": "inventory.customers:id,first_name,email",
// 过滤条件
"message.key.columns": "inventory.customers:id",
// 忽略大字段
"column.exclude.list": "inventory.orders:large_blob_field"
}
}
总结
CDC技术的核心价值:
- 实时性:毫秒级延迟,满足实时业务需求
- 低侵入:基于日志,不影响源库性能
- 可靠性:Exactly-Once语义,保证数据一致性
- 灵活性:支持多种源端和目标端
实施要点:
- 合理配置快照模式(initial/schema_only/never)
- 监控延迟和错误指标
- 处理Schema演进(兼容性策略)
- 设计故障恢复机制
- 优化批量处理和过滤规则
延伸阅读
- Debezium Documentation
- CDC with Apache Kafka
- Confluent Platform
- MySQL Binlog Format
- PostgreSQL Logical Replication
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。