数据仓库与湖仓一体架构:从OLAP到实时分析的演进

深入讲解数据仓库、数据湖、湖仓一体架构的演进历程,涵盖维度建模、ETL流程、实时数仓、Iceberg/Hudi等核心技术,提供ClickHouse、Spark的实战案例。

数据架构演进

数据架构演进历程:
┌─────────────────────────────────────────┐
│ 第一代:数据仓库(Data Warehouse)       │
│ - 结构化数据                             │
│ - Schema-on-Write                       │
│ - 批处理ETL                              │
│ - 代表:Oracle、Teradata、Redshift       │
│                                         │
│ 第二代:数据湖(Data Lake)              │
│ - 非结构化/半结构化数据                  │
│ - Schema-on-Read                        │
│ - 原始数据存储                           │
│ - 代表:HDFS、S3、Azure Data Lake        │
│                                         │
│ 第三代:湖仓一体(Lakehouse)            │
│ - 结合仓库和湖的优势                     │
│ - ACID事务支持                           │
│ - 实时+离线统一                          │
│ - 代表:Delta Lake、Iceberg、Hudi        │
│                                         │
│ 第四代:实时数仓(Real-time Warehouse)  │
│ - 流批一体                               │
│ - 秒级延迟                               │
│ - 代表:ClickHouse、StarRocks、Doris     │
└─────────────────────────────────────────┘

维度建模

星型模型

-- 事实表:订单事实
CREATE TABLE fact_orders (
    order_id BIGINT PRIMARY KEY,
    order_date_key INT,           -- 日期维度外键
    user_key BIGINT,              -- 用户维度外键
    product_key BIGINT,           -- 产品维度外键
    store_key INT,                -- 门店维度外键
    
    -- 度量值
    quantity INT,
    unit_price DECIMAL(10,2),
    discount_amount DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    
    -- 退化维度
    order_status VARCHAR(20),
    payment_method VARCHAR(20)
);

-- 日期维度
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_week INT,
    is_holiday BOOLEAN
);

-- 用户维度(缓慢变化维度)
CREATE TABLE dim_user (
    user_key BIGINT PRIMARY KEY,
    user_id BIGINT,
    username VARCHAR(50),
    email VARCHAR(100),
    register_date DATE,
    vip_level INT,
    
    -- SCD Type 2
    effective_date DATE,
    expiry_date DATE,
    is_current BOOLEAN
);

-- 典型查询:按月统计销售额
SELECT 
    d.year,
    d.month,
    SUM(f.total_amount) as total_sales,
    COUNT(DISTINCT f.user_key) as unique_users
FROM fact_orders f
JOIN dim_date d ON f.order_date_key = d.date_key
WHERE d.year = 2026
GROUP BY d.year, d.month
ORDER BY d.month;

雪花模型

-- 产品维度(规范化)
CREATE TABLE dim_product (
    product_key BIGINT PRIMARY KEY,
    product_id BIGINT,
    product_name VARCHAR(200),
    category_key INT,             -- 外键到类别维度
    brand_key INT,                -- 外键到品牌维度
    price DECIMAL(10,2)
);

CREATE TABLE dim_category (
    category_key INT PRIMARY KEY,
    category_name VARCHAR(100),
    parent_key INT,               -- 层级结构
    level INT
);

CREATE TABLE dim_brand (
    brand_key INT PRIMARY KEY,
    brand_name VARCHAR(100),
    country VARCHAR(50)
);

-- 查询:按品牌和类别统计
SELECT 
    b.brand_name,
    c.category_name,
    SUM(f.total_amount) as sales
FROM fact_orders f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_brand b ON p.brand_key = b.brand_key
JOIN dim_category c ON p.category_key = c.category_key
GROUP BY b.brand_name, c.category_name;

ETL流程设计

批处理ETL

# etl/daily_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("DailyETL") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 1. 抽取(Extract)
orders_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://source-db:3306/ecommerce") \
    .option("dbtable", "(SELECT * FROM orders WHERE DATE(created_at) = CURRENT_DATE - 1) as orders") \
    .option("user", "etl_user") \
    .option("password", "password") \
    .load()

users_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://source-db:3306/ecommerce") \
    .option("dbtable", "users") \
    .option("user", "etl_user") \
    .option("password", "password") \
    .load()

# 2. 转换(Transform)
# 清洗数据
orders_clean = orders_df \
    .filter(col("status").isin("PAID", "SHIPPED", "COMPLETED")) \
    .dropna(subset=["user_id", "total_amount"])

# 关联维度
orders_with_user = orders_clean.join(
    users_df.select("user_id", "vip_level", "register_date"),
    on="user_id",
    how="left"
)

# 计算派生指标
orders_transformed = orders_with_user \
    .withColumn("order_date_key", date_format(col("created_at"), "yyyyMMdd").cast("int")) \
    .withColumn("is_new_user", datediff(col("created_at"), col("register_date")) <= 7) \
    .withColumn("discount_rate", col("discount_amount") / col("total_amount"))

# 3. 加载(Load)
# 写入分区表
orders_transformed.write \
    .format("parquet") \
    .mode("append") \
    .partitionBy("order_date_key") \
    .save("/data/warehouse/fact_orders")

# 更新维度表(SCD Type 2)
def update_slowly_changing_dimension(new_data, table_name, key_column):
    # 读取现有数据
    existing = spark.read.parquet(f"/data/warehouse/{table_name}")
    
    # 找出变化的记录
    changed = new_data.join(
        existing.filter(col("is_current") == True),
        on=key_column,
        how="inner"
    ).filter(
        (new_data["vip_level"] != existing["vip_level"]) |
        (new_data["email"] != existing["email"])
    )
    
    if changed.count() > 0:
        # 将旧记录标记为过期
        old_records = existing.filter(
            col(key_column).isin(changed.select(key_column).rdd.flatMap(lambda x: x).collect())
        ).withColumn("is_current", lit(False)) \
         .withColumn("expiry_date", current_date())
        
        # 插入新记录
        new_records = changed \
            .withColumn("user_key", monotonically_increasing_id()) \
            .withColumn("effective_date", current_date()) \
            .withColumn("expiry_date", lit(None).cast("date")) \
            .withColumn("is_current", lit(True))
        
        # 合并写入
        final = existing.filter(~col(key_column).isin(
            changed.select(key_column).rdd.flatMap(lambda x: x).collect()
        )).union(old_records).union(new_records)
        
        final.write \
            .mode("overwrite") \
            .parquet(f"/data/warehouse/{table_name}")

update_slowly_changing_dimension(users_df, "dim_user", "user_id")

spark.stop()

实时ETL

# etl/realtime_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("RealtimeETL") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoint/realtime") \
    .getOrCreate()

# 从Kafka读取订单事件
orders_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
    .option("subscribe", "order-events") \
    .option("startingOffsets", "latest") \
    .load()

# 解析JSON
orders_parsed = orders_stream.select(
    from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")

# 窗口聚合:每5分钟统计
windowed_agg = orders_parsed \
    .withWatermark("created_at", "10 minutes") \
    .groupBy(
        window(col("created_at"), "5 minutes"),
        col("store_id")
    ) \
    .agg(
        count("*").alias("order_count"),
        sum("total_amount").alias("total_sales"),
        avg("total_amount").alias("avg_order_value")
    )

# 写入Kafka供下游消费
query = windowed_agg.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
    .option("topic", "order-aggregations") \
    .outputMode("update") \
    .start()

query.awaitTermination()

湖仓一体架构

Apache Iceberg

// iceberg/TableOperations.java
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.SparkCatalog;

public class IcebergTableOperations {
    
    private final SparkSession spark;
    private final SparkCatalog catalog;
    
    public IcebergTableOperations(SparkSession spark) {
        this.spark = spark;
        this.catalog = (SparkCatalog) spark.sessionState().catalogManager().catalog("iceberg_catalog");
    }
    
    // 创建Iceberg表
    public void createTable() {
        spark.sql("""
            CREATE TABLE iceberg_catalog.db.orders (
                order_id BIGINT,
                user_id BIGINT,
                order_date DATE,
                total_amount DECIMAL(10,2),
                status STRING
            )
            USING iceberg
            PARTITIONED BY (days(order_date))
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'zstd'
            )
        """);
    }
    
    // 写入数据(支持ACID事务)
    public void appendData(Dataset<Row> df) {
        df.writeTo("iceberg_catalog.db.orders")
          .tableProperty("write.wap.enabled", "true")  // Write-Audit-Publish
          .append();
    }
    
    // 更新数据(Merge-on-Read)
    public void updateStatus(long orderId, String newStatus) {
        spark.sql(f"""
            UPDATE iceberg_catalog.db.orders
            SET status = '{newStatus}', updated_at = current_timestamp()
            WHERE order_id = {orderId}
        """);
    }
    
    // 时间旅行查询
    public Dataset<Row> queryAtTimestamp(long timestamp) {
        return spark.sql(f"""
            SELECT * FROM iceberg_catalog.db.orders
            FOR TIMESTAMP AS OF {timestamp}
        """);
    }
    
    // 快照管理
    public void manageSnapshots() {
        Table table = catalog.loadTable(TableIdentifier.of("db", "orders"));
        
        // 列出快照
        for (Snapshot snapshot : table.snapshots()) {
            System.out.printf("Snapshot ID: %d, Timestamp: %s%n",
                snapshot.snapshotId(),
                new Date(snapshot.timestampMillis()));
        }
        
        // 回滚到指定快照
        table.manageSnapshots()
             .rollbackTo(snapshotId)
             .commit();
        
        // 清理旧快照
        table.expireSnapshots()
             .expireOlderThan(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000)
             .commit();
    }
}

ClickHouse实时数仓

表引擎选择

-- MergeTree:基础引擎
CREATE TABLE events_merge_tree (
    event_date Date,
    event_time DateTime,
    user_id UInt64,
    event_type String,
    properties String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_type, user_id, event_time);

-- ReplacingMergeTree:去重
CREATE TABLE users_replacing (
    user_id UInt64,
    username String,
    email String,
    updated_at DateTime,
    version UInt32
)
ENGINE = ReplacingMergeTree(version)
ORDER BY user_id;

-- AggregatingMergeTree:预聚合
CREATE TABLE daily_sales_agg (
    date Date,
    store_id UInt32,
    sales AggregateFunction(sum, Decimal(10,2)),
    order_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, store_id);

-- 物化视图:自动聚合
CREATE MATERIALIZED VIEW daily_sales_mv
TO daily_sales_agg
AS SELECT
    toDate(created_at) as date,
    store_id,
    sumState(total_amount) as sales,
    countState() as order_count
FROM orders
GROUP BY date, store_id;

查询优化

-- 使用PREWHERE优化
SELECT 
    user_id,
    count() as event_count
FROM events
PREWHERE event_date >= '2026-01-01'  -- 先过滤,减少读取量
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING event_count > 10;

-- 使用Sampling加速近似查询
SELECT 
    uniq(user_id) * 10 as estimated_users  -- 乘以采样倍率
FROM events
SAMPLE 0.1  -- 只扫描10%的数据
WHERE event_date = '2026-04-08';

-- 使用字典加速维度关联
CREATE DICTIONARY dim_user_dict (
    user_id UInt64,
    username String,
    vip_level UInt8
)
PRIMARY KEY user_id
SOURCE(CLICKHOUSE(
    HOST 'localhost' PORT 9000 DB 'warehouse' TABLE 'dim_user'
))
LAYOUT(HASHED())
LIFETIME(MIN 60 MAX 120);

-- 使用字典函数
SELECT 
    dictGet('dim_user_dict', 'username', user_id) as username,
    count() as event_count
FROM events
GROUP BY user_id;

总结

数据架构的选择应基于业务需求:

  1. 数据仓库:适合结构化数据的复杂分析
  2. 数据湖:适合海量原始数据的灵活探索
  3. 湖仓一体:兼顾两者优势,支持ACID事务
  4. 实时数仓:满足秒级延迟的分析需求

关键原则:

  • 选择合适的建模方法(星型/雪花)
  • 设计健壮的ETL流程
  • 利用分区和索引优化查询
  • 实现数据质量监控
  • 考虑数据治理和血缘追踪

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页