数据库分片实战:水平扩展的架构设计与实现

深入讲解数据库分片的核心策略,涵盖哈希分片、范围分片、目录分片等模式,详解分片键选择、跨分片查询、数据迁移等关键问题,提供ShardingSphere实战案例。

分片的必要性

当单个数据库实例无法满足性能或容量需求时,分片成为必要的扩展手段。

分片的触发条件:
┌─────────────────────────────────────────┐
│ 容量瓶颈                                 │
│ - 单表数据量超过 1000 万行               │
│ - 数据库文件大小超过 500GB               │
│ - 备份和恢复时间过长                     │
│                                         │
│ 性能瓶颈                                 │
│ - 写入 QPS 超过 5000                     │
│ - 读取延迟超过 100ms                     │
│ - CPU/内存持续高位                       │
│                                         │
│ 业务需求                                 │
│ - 多地域部署降低延迟                     │
│ - 数据隔离(多租户)                     │
│ - 成本优化(冷热数据分离)               │
└─────────────────────────────────────────┘

分片策略

哈希分片

// sharding/HashShardingStrategy.java
public class HashShardingStrategy implements ShardingStrategy {
    private final int shardCount;
    
    public HashShardingStrategy(int shardCount) {
        if (shardCount <= 0 || (shardCount & (shardCount - 1)) != 0) {
            throw new IllegalArgumentException("Shard count must be a power of 2");
        }
        this.shardCount = shardCount;
    }
    
    @Override
    public int calculateShard(Object shardKey) {
        int hash = MurmurHash3.hash32(shardKey.toString());
        // 使用位运算代替取模,性能更好
        return hash & (shardCount - 1);
    }
    
    @Override
    public List<Integer> calculateShards(Range<Object> range) {
        // 范围查询需要访问所有分片
        return IntStream.range(0, shardCount).boxed().collect(Collectors.toList());
    }
}

// 使用示例
HashShardingStrategy strategy = new HashShardingStrategy(16);
int shardId = strategy.calculateShard(userId); // 0-15
String tableName = "users_" + shardId; // users_0, users_1, ...

优点

  • 数据分布均匀
  • 避免热点问题
  • 实现简单

缺点

  • 范围查询需要扫描所有分片
  • 扩容困难(需要重新哈希)

范围分片

// sharding/RangeShardingStrategy.java
public class RangeShardingStrategy implements ShardingStrategy {
    private final TreeMap<Long, Integer> rangeMap;
    
    public RangeShardingStrategy() {
        this.rangeMap = new TreeMap<>();
        // 按时间范围分片
        rangeMap.put(0L, 0);                    // 2020年之前 -> shard 0
        rangeMap.put(1577836800000L, 1);        // 2020年 -> shard 1
        rangeMap.put(1609459200000L, 2);        // 2021年 -> shard 2
        rangeMap.put(1640995200000L, 3);        // 2022年 -> shard 3
    }
    
    @Override
    public int calculateShard(Object shardKey) {
        long timestamp = (Long) shardKey;
        return rangeMap.floorEntry(timestamp).getValue();
    }
    
    @Override
    public List<Integer> calculateShards(Range<Object> range) {
        long start = (Long) range.lowerEndpoint();
        long end = (Long) range.upperEndpoint();
        
        Set<Integer> shards = new HashSet<>();
        for (Map.Entry<Long, Integer> entry : rangeMap.subMap(start, true, end, true).entrySet()) {
            shards.add(entry.getValue());
        }
        
        return new ArrayList<>(shards);
    }
}

// 使用示例:订单表按年月分片
CREATE TABLE orders_202401 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    order_date DATE,
    amount DECIMAL(10,2)
);

CREATE TABLE orders_202402 (...);

优点

  • 范围查询高效
  • 扩容简单(新增分片)
  • 数据管理方便(按时间归档)

缺点

  • 可能产生热点(如最近月份)
  • 数据分布不均

目录分片

// sharding/DirectoryShardingStrategy.java
public class DirectoryShardingStrategy implements ShardingStrategy {
    private final Map<Object, Integer> directory;
    private final DataSource configDb;
    
    public DirectoryShardingStrategy(DataSource configDb) {
        this.configDb = configDb;
        this.directory = loadDirectory();
    }
    
    private Map<Object, Integer> loadDirectory() {
        Map<Object, Integer> dir = new HashMap<>();
        
        try (Connection conn = configDb.getConnection();
             Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(
                 "SELECT shard_key, shard_id FROM shard_directory")) {
            
            while (rs.next()) {
                dir.put(rs.getString("shard_key"), rs.getInt("shard_id"));
            }
        }
        
        return dir;
    }
    
    @Override
    public int calculateShard(Object shardKey) {
        Integer shardId = directory.get(shardKey);
        if (shardId == null) {
            throw new ShardingException("Shard key not found: " + shardKey);
        }
        return shardId;
    }
    
    public void updateDirectory(Object shardKey, int newShardId) {
        directory.put(shardKey, newShardId);
        
        // 持久化到配置库
        try (Connection conn = configDb.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "INSERT INTO shard_directory (shard_key, shard_id) VALUES (?, ?) " +
                 "ON DUPLICATE KEY UPDATE shard_id = ?")) {
            
            stmt.setString(1, shardKey.toString());
            stmt.setInt(2, newShardId);
            stmt.setInt(3, newShardId);
            stmt.executeUpdate();
        }
    }
}

优点

  • 灵活性最高
  • 支持任意映射关系
  • 便于数据迁移

缺点

  • 需要额外的目录表
  • 目录表本身可能成为瓶颈

ShardingSphere实战

配置分片规则

# sharding-config.yaml
spring:
  shardingsphere:
    datasource:
      names: ds0,ds1,ds2,ds3
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/order_db_0
        username: root
        password: password
      ds1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/order_db_1
        username: root
        password: password
      # ds2, ds3 类似配置
    
    rules:
      sharding:
        tables:
          # 订单表分片
          t_order:
            actual-data-nodes: ds$->{0..3}.t_order_$->{0..7}
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-hash
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-table-hash
            key-generate-strategy:
              column: order_id
              key-generator-name: snowflake
          
          # 订单详情表(与订单表绑定)
          t_order_item:
            actual-data-nodes: ds$->{0..3}.t_order_item_$->{0..7}
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-hash
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-table-hash
        
        sharding-algorithms:
          order-db-hash:
            type: HASH_MOD
            props:
              sharding-count: 4
          order-table-hash:
            type: HASH_MOD
            props:
              sharding-count: 8
        
        key-generators:
          snowflake:
            type: SNOWFLAKE
            props:
              worker-id: 123

跨分片查询

// service/OrderQueryService.java
@Service
public class OrderQueryService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    // 单分片查询(指定分片键)
    public Order getOrder(Long orderId, Long userId) {
        // ShardingSphere 自动路由到正确的分片
        return orderRepository.findByOrderIdAndUserId(orderId, userId);
    }
    
    // 跨分片查询(未指定分片键)
    public List<Order> getOrdersByStatus(String status) {
        // 需要扫描所有分片,性能较差
        return orderRepository.findByStatus(status);
    }
    
    // 优化:使用二级索引表
    @Autowired
    private OrderIndexRepository orderIndexRepository;
    
    public List<Order> getOrdersByStatusOptimized(String status, int limit) {
        // 1. 从索引表获取订单ID和分片键
        List<OrderIndex> indexes = orderIndexRepository
            .findByStatus(status, limit);
        
        // 2. 按分片分组
        Map<Integer, List<Long>> ordersByShard = indexes.stream()
            .collect(Collectors.groupingBy(
                idx -> calculateShard(idx.getUserId()),
                Collectors.mapping(OrderIndex::getOrderId, Collectors.toList())
            ));
        
        // 3. 并行查询各分片
        return ordersByShard.entrySet().parallelStream()
            .flatMap(entry -> {
                List<Long> orderIds = entry.getValue();
                return orderRepository.findByOrderIds(orderIds).stream();
            })
            .collect(Collectors.toList());
    }
}

数据迁移

// migration/ShardMigrationService.java
@Service
public class ShardMigrationService {
    
    @Autowired
    private DataSource sourceDs;
    
    @Autowired
    private DataSource targetDs;
    
    public void migrateShard(int sourceShard, int targetShard, 
                            Predicate<Long> shouldMigrate) {
        int batchSize = 1000;
        long lastId = 0;
        
        while (true) {
            // 1. 从源分片读取一批数据
            List<Order> batch = readBatch(sourceShard, lastId, batchSize);
            if (batch.isEmpty()) {
                break;
            }
            
            // 2. 过滤需要迁移的数据
            List<Order> toMigrate = batch.stream()
                .filter(order -> shouldMigrate.test(order.getUserId()))
                .collect(Collectors.toList());
            
            // 3. 写入目标分片
            if (!toMigrate.isEmpty()) {
                writeBatch(targetShard, toMigrate);
                
                // 4. 从源分片删除
                deleteBatch(sourceShard, toMigrate);
            }
            
            lastId = batch.get(batch.size() - 1).getId();
            
            // 控制迁移速度
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    private List<Order> readBatch(int shard, long lastId, int limit) {
        String sql = "SELECT * FROM t_order_" + shard + 
                     " WHERE id > ? ORDER BY id LIMIT ?";
        
        try (Connection conn = sourceDs.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            
            stmt.setLong(1, lastId);
            stmt.setInt(2, limit);
            
            List<Order> orders = new ArrayList<>();
            try (ResultSet rs = stmt.executeQuery()) {
                while (rs.next()) {
                    orders.add(mapOrder(rs));
                }
            }
            return orders;
        }
    }
}

分片键选择

分片键选择原则:
┌─────────────────────────────────────────┐
│ 1. 高基数                                │
│    - 避免数据倾斜                         │
│    - 例如:user_id(好) vs gender(差)   │
│                                         │
│ 2. 常用查询条件                          │
│    - 大部分查询都包含该字段               │
│    - 减少跨分片查询                       │
│                                         │
│ 3. 业务相关性                            │
│    - 相关数据尽量在同一分片               │
│    - 例如:订单和订单详情用相同分片键     │
│                                         │
│ 4. 不可变性                              │
│    - 分片键值不应修改                     │
│    - 否则需要跨分片迁移数据               │
└─────────────────────────────────────────┘

常见分片键:
- 用户系统:user_id
- 订单系统:user_id 或 order_id
- 日志系统:timestamp
- 多租户系统:tenant_id

总结

数据库分片是水平扩展的核心手段:

  1. 分片策略:哈希分片均匀分布,范围分片便于管理,目录分片最灵活
  2. 分片键选择:高基数、常用查询、业务相关、不可变
  3. 跨分片查询:尽量避免,必要时使用二级索引
  4. 数据迁移:批量处理,控制速度,确保一致性

关键原则:

  • 分片是最后的手段,优先考虑其他优化
  • 选择合适的分片键比选择分片算法更重要
  • 实现监控,及时发现热点和倾斜
  • 预留扩容能力,避免频繁重新分片

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页