数据库分片与分区:水平扩展大规模数据存储

深入讲解数据库分片(Sharding)与分区(Partitioning)的核心策略,详解Range、Hash、List分区模式,提供ShardingSphere、Vitess等中间件的实战配置与跨分片查询优化方案。

引言

当单表数据量超过千万级、单库存储超过TB级时,数据库的水平扩展成为必然选择。分区(Partitioning)和分片(Sharding)是两种核心的扩展策略,分别解决单库内的数据管理和多库间的数据分布问题。

分区 vs 分片

特性分区(Partitioning)分片(Sharding)
范围单库内多库间
复杂度
透明性对应用透明需要中间件或代码改造
扩展性有限理论上无限
适用场景时序数据、大表优化超大规模数据

表分区(Partitioning)

Range分区(按时间范围)

-- 按月分区订单表
CREATE TABLE orders (
    id BIGSERIAL,
    user_id BIGINT NOT NULL,
    order_number VARCHAR(50) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

-- 创建月度分区
CREATE TABLE orders_2026_01 PARTITION OF orders
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE orders_2026_02 PARTITION OF orders
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

CREATE TABLE orders_2026_03 PARTITION OF orders
    FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');

-- 默认分区(捕获未匹配的数据)
CREATE TABLE orders_default PARTITION OF orders DEFAULT;

自动分区管理(pg_partman)

-- 安装pg_partman
CREATE EXTENSION pg_partman;

-- 配置自动分区
SELECT partman.create_parent(
    p_parent_table := 'public.orders',
    p_control := 'created_at',
    p_type := 'range',
    p_interval := '1 month',
    p_premake := 3,  -- 预创建3个未来分区
    p_start_partition := '2026-01-01'
);

-- 更新配置
UPDATE partman.part_config 
SET retention = '6 months',  -- 保留6个月数据
    retention_keep_table = false  -- 过期自动删除
WHERE parent_table = 'public.orders';

-- 手动运行维护(通常通过cron)
SELECT partman.run_maintenance();

Hash分区(按ID散列)

-- 按user_id哈希分区
CREATE TABLE user_events (
    id BIGSERIAL,
    user_id BIGINT NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    
    PRIMARY KEY (id, user_id)
) PARTITION BY HASH (user_id);

-- 创建16个哈希分区
CREATE TABLE user_events_p0 PARTITION OF user_events FOR VALUES WITH (MODULUS 16, REMAINDER 0);
CREATE TABLE user_events_p1 PARTITION OF user_events FOR VALUES WITH (MODULUS 16, REMAINDER 1);
CREATE TABLE user_events_p2 PARTITION OF user_events FOR VALUES WITH (MODULUS 16, REMAINDER 2);
-- ... 重复到 p15

-- 或者使用脚本批量创建
DO $$
BEGIN
    FOR i IN 0..15 LOOP
        EXECUTE format('CREATE TABLE user_events_p%s PARTITION OF user_events FOR VALUES WITH (MODULUS 16, REMAINDER %s)', i, i);
    END LOOP;
END $$;

List分区(按枚举值)

-- 按地区分区
CREATE TABLE sales (
    id BIGSERIAL,
    region VARCHAR(20) NOT NULL,
    product_id BIGINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    sale_date DATE NOT NULL,
    
    PRIMARY KEY (id, region)
) PARTITION BY LIST (region);

CREATE TABLE sales_asia PARTITION OF sales FOR VALUES IN ('China', 'Japan', 'Korea', 'India');
CREATE TABLE sales_europe PARTITION OF sales FOR VALUES IN ('Germany', 'France', 'UK', 'Italy');
CREATE TABLE sales_americas PARTITION OF sales FOR VALUES IN ('USA', 'Canada', 'Brazil', 'Mexico');
CREATE TABLE sales_default PARTITION OF sales DEFAULT;

复合分区(Sub-partitioning)

-- 先按时间分区,再按地区子分区
CREATE TABLE logs (
    id BIGSERIAL,
    log_time TIMESTAMP NOT NULL,
    region VARCHAR(20) NOT NULL,
    level VARCHAR(10) NOT NULL,
    message TEXT,
    
    PRIMARY KEY (id, log_time, region)
) PARTITION BY RANGE (log_time);

-- 创建时间分区
CREATE TABLE logs_2026_q1 PARTITION OF logs
    FOR VALUES FROM ('2026-01-01') TO ('2026-04-01')
    PARTITION BY LIST (region);

-- 在时间分区内创建地区子分区
CREATE TABLE logs_2026_q1_asia PARTITION OF logs_2026_q1
    FOR VALUES IN ('China', 'Japan', 'Korea');
CREATE TABLE logs_2026_q1_europe PARTITION OF logs_2026_q1
    FOR VALUES IN ('Germany', 'France', 'UK');
CREATE TABLE logs_2026_q1_default PARTITION OF logs_2026_q1 DEFAULT;

数据库分片(Sharding)

分片键选择策略

// 分片键选择评估器
type ShardKeyEvaluator struct {
    // 查询模式分析
    queryPatterns []QueryPattern
}

type QueryPattern struct {
    Frequency   int     // 查询频率
    FilterField string  // 主要过滤字段
    JoinFields  []string // JOIN字段
}

func (e *ShardKeyEvaluator) Evaluate() string {
    // 评估标准:
    // 1. 高频率查询的过滤字段
    // 2. 能均匀分布数据的字段
    // 3. 尽量减少跨分片查询
    
    scores := make(map[string]int)
    
    for _, pattern := range e.queryPatterns {
        scores[pattern.FilterField] += pattern.Frequency
    }
    
    // 选择得分最高的字段
    var bestField string
    maxScore := 0
    for field, score := range scores {
        if score > maxScore {
            maxScore = score
            bestField = field
        }
    }
    
    return bestField
}

ShardingSphere配置

# ShardingSphere-JDBC配置
mode:
  type: Standalone
  repository:
    type: JDBC

dataSources:
  ds_0:
    url: jdbc:mysql://db-shard-0:3306/order_db?serverTimezone=UTC
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
  
  ds_1:
    url: jdbc:mysql://db-shard-1:3306/order_db?serverTimezone=UTC
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
  
  ds_2:
    url: jdbc:mysql://db-shard-2:3306/order_db?serverTimezone=UTC
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50

rules:
  - !SHARDING
    tables:
      orders:
        actualDataNodes: ds_${0..2}.orders_${0..7}
        databaseStrategy:
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: db_mod
        tableStrategy:
          standard:
            shardingColumn: order_id
            shardingAlgorithmName: table_mod
    
    shardingAlgorithms:
      db_mod:
        type: MOD
        props:
          sharding-count: 3
      table_mod:
        type: MOD
        props:
          sharding-count: 8
    
    keyGenerateStrategies:
      orders:
        column: id
        keyGeneratorName: snowflake
    
    keyGenerators:
      snowflake:
        type: SNOWFLAKE
        props:
          worker-id: 1

Vitess部署

# vitess-compose.yaml
version: '3.8'

services:
  vtctld:
    image: vitess/lite:v17.0.0
    command: >
      vtctld
      --cell=zone1
      --service_map='grpc-vtctld'
      --grpc_port=15999
      --port=15000
    ports:
      - "15000:15000"
      - "15999:15999"
  
  vtgate:
    image: vitess/lite:v17.0.0
    command: >
      vtgate
      --tablet_types_to_wait=PRIMARY,REPLICA
      --cell=zone1
      --cells_to_watch=zone1
      --port=15001
      --grpc_port=15991
      --mysql_server_port=15306
      --mysql_auth_server_impl=none
    ports:
      - "15001:15001"
      - "15306:15306"
    depends_on:
      - vtctld
  
  vttablet:
    image: vitess/lite:v17.0.0
    command: >
      vttablet
      --tablet-path=zone1-100
      --init_keyspace=commerce
      --init_shard=0
      --init_tablet_type=primary
      --port=15100
      --grpc_port=15992
    ports:
      - "15100:15100"
-- Vitess VSchema定义
{
  "sharded": true,
  "vindexes": {
    "hash": {
      "type": "hash"
    },
    "user_id_idx": {
      "type": "hash"
    }
  },
  "tables": {
    "orders": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_id_idx"
        }
      ],
      "auto_increment": {
        "column": "id",
        "sequence": "order_seq"
      }
    },
    "order_items": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_id_idx"
        }
      ]
    }
  }
}

跨分片查询优化

全局索引表

-- 全局索引表(存储在独立数据库)
CREATE TABLE order_global_index (
    order_number VARCHAR(50) PRIMARY KEY,
    shard_id INT NOT NULL,  -- 分片ID
    user_id BIGINT NOT NULL,
    created_at TIMESTAMP NOT NULL,
    
    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at)
);

-- 通过订单号查询(先查索引,再查分片)
-- 1. SELECT shard_id FROM order_global_index WHERE order_number = 'ORD-123456'
-- 2. 根据shard_id路由到对应分片
-- 3. SELECT * FROM orders WHERE order_number = 'ORD-123456'

应用层实现

type ShardedOrderRepository struct {
    shards       []*sql.DB
    globalIndex  *sql.DB
    shardCount   int
}

func (r *ShardedOrderRepository) GetOrderByNumber(ctx context.Context, orderNumber string) (*Order, error) {
    // 1. 从全局索引获取分片ID
    var shardID int
    err := r.globalIndex.QueryRowContext(ctx,
        "SELECT shard_id FROM order_global_index WHERE order_number = ?",
        orderNumber).Scan(&shardID)
    
    if err != nil {
        return nil, err
    }
    
    // 2. 路由到对应分片
    shard := r.shards[shardID]
    
    // 3. 查询实际数据
    order, err := r.queryOrderFromShard(ctx, shard, orderNumber)
    if err != nil {
        return nil, err
    }
    
    return order, nil
}

// 跨分片聚合查询
func (r *ShardedOrderRepository) GetUserOrdersAcrossShards(ctx context.Context, userID int64) ([]*Order, error) {
    // 并行查询所有分片
    var wg sync.WaitGroup
    results := make(chan []*Order, len(r.shards))
    errors := make(chan error, len(r.shards))
    
    for _, shard := range r.shards {
        wg.Add(1)
        go func(db *sql.DB) {
            defer wg.Done()
            
            orders, err := r.queryOrdersByUserID(ctx, db, userID)
            if err != nil {
                errors <- err
                return
            }
            results <- orders
        }(shard)
    }
    
    wg.Wait()
    close(results)
    close(errors)
    
    // 检查错误
    for err := range errors {
        if err != nil {
            return nil, err
        }
    }
    
    // 合并结果
    var allOrders []*Order
    for orders := range results {
        allOrders = append(allOrders, orders...)
    }
    
    // 排序
    sort.Slice(allOrders, func(i, j int) bool {
        return allOrders[i].CreatedAt.After(allOrders[j].CreatedAt)
    })
    
    return allOrders, nil
}

数据迁移与重分片

在线重分片策略

type ReshardingManager struct {
    oldShards []*sql.DB
    newShards []*sql.DB
}

// 双写阶段:同时写入新旧分片
func (m *ReshardingManager) DualWrite(ctx context.Context, order *Order) error {
    oldShard := m.routeToOldShard(order.UserID)
    newShard := m.routeToNewShard(order.UserID)
    
    // 写入旧分片
    if err := m.writeToShard(ctx, oldShard, order); err != nil {
        return err
    }
    
    // 异步写入新分片
    go func() {
        if err := m.writeToShard(ctx, newShard, order); err != nil {
            log.Errorf("Failed to write to new shard: %v", err)
        }
    }()
    
    return nil
}

// 历史数据迁移
func (m *ReshardingManager) MigrateHistoricalData(ctx context.Context) error {
    for _, oldShard := range m.oldShards {
        // 分批读取
        var lastID int64
        batchSize := 1000
        
        for {
            orders, err := m.readBatch(ctx, oldShard, lastID, batchSize)
            if err != nil {
                return err
            }
            
            if len(orders) == 0 {
                break
            }
            
            // 按新规则路由并写入
            for _, order := range orders {
                newShard := m.routeToNewShard(order.UserID)
                if err := m.writeToShard(ctx, newShard, order); err != nil {
                    return err
                }
            }
            
            lastID = orders[len(orders)-1].ID
        }
    }
    
    return nil
}

总结

分区与分片选择指南:

优先使用分区

  • 时序数据(日志、事件)
  • 单表超过1000万行
  • 需要定期清理历史数据
  • 查询通常带分区键

考虑分片

  • 单库存储超过1TB
  • 写入QPS超过单库上限(通常5000-10000)
  • 需要水平扩展读写能力
  • 有成熟的分片中间件支持

关键原则:

  • 分片键选择决定系统成败
  • 避免跨分片事务和JOIN
  • 设计全局索引支持非分片键查询
  • 充分测试分片均衡性

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页