Skip to content

数据仓库与数据湖

概述

数据仓库和数据湖是现代企业数据管理的两种重要架构模式。数据仓库提供结构化的数据存储和分析能力,而数据湖则支持多种数据类型的存储和处理。随着大数据技术的发展,两者正在融合形成现代数据平台。

数据仓库(Data Warehouse)

核心概念

数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。

特点

  • 面向主题:围绕特定业务主题组织数据
  • 集成性:来自多个数据源的数据经过清洗和转换
  • 非易失性:数据一旦进入仓库,一般不会被删除
  • 时变性:记录数据的历史变化

传统数据仓库架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据源层      │    │   数据集成层    │    │   数据存储层    │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 业务系统    │ │───▶│ │ ETL工具     │ │───▶│ │ 数据仓库    │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 外部数据    │ │───▶│ │ 数据清洗    │ │───▶│ │ 数据集市    │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 日志文件    │ │───▶│ │ 数据转换    │ │───▶│ │ OLAP立方体  │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘


                                               ┌─────────────────┐
                                               │   数据应用层    │
                                               │                 │
                                               │ ┌─────────────┐ │
                                               │ │ BI报表      │ │
                                               │ └─────────────┘ │
                                               │ ┌─────────────┐ │
                                               │ │ 数据分析    │ │
                                               │ └─────────────┘ │
                                               │ ┌─────────────┐ │
                                               │ │ 决策支持    │ │
                                               │ └─────────────┘ │
                                               └─────────────────┘

维度建模

星型模型(Star Schema)

sql
-- 事实表:销售事实表
CREATE TABLE fact_sales (
    sale_id BIGINT PRIMARY KEY,
    product_key INT,
    customer_key INT,
    date_key INT,
    store_key INT,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(12,2),
    discount_amount DECIMAL(10,2),
    created_time TIMESTAMP
);

-- 维度表:产品维度
CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    subcategory VARCHAR(100),
    brand VARCHAR(100),
    supplier VARCHAR(100),
    unit_cost DECIMAL(10,2),
    created_date DATE,
    updated_date DATE
);

-- 维度表:客户维度
CREATE TABLE dim_customer (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    gender VARCHAR(10),
    age_group VARCHAR(20),
    city VARCHAR(100),
    province VARCHAR(100),
    customer_segment VARCHAR(50),
    registration_date DATE
);

-- 维度表:日期维度
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    full_date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_month INT,
    day_of_week INT,
    day_name VARCHAR(20),
    month_name VARCHAR(20),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);

-- 维度表:门店维度
CREATE TABLE dim_store (
    store_key INT PRIMARY KEY,
    store_id VARCHAR(50),
    store_name VARCHAR(200),
    store_type VARCHAR(50),
    city VARCHAR(100),
    province VARCHAR(100),
    region VARCHAR(100),
    manager VARCHAR(100),
    open_date DATE
);

雪花模型(Snowflake Schema)

sql
-- 产品维度表(规范化)
CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category_key INT,
    brand_key INT,
    supplier_key INT,
    unit_cost DECIMAL(10,2)
);

-- 产品类别表
CREATE TABLE dim_category (
    category_key INT PRIMARY KEY,
    category_id VARCHAR(50),
    category_name VARCHAR(100),
    parent_category_key INT
);

-- 品牌表
CREATE TABLE dim_brand (
    brand_key INT PRIMARY KEY,
    brand_id VARCHAR(50),
    brand_name VARCHAR(100),
    brand_country VARCHAR(100)
);

-- 供应商表
CREATE TABLE dim_supplier (
    supplier_key INT PRIMARY KEY,
    supplier_id VARCHAR(50),
    supplier_name VARCHAR(200),
    supplier_city VARCHAR(100),
    supplier_country VARCHAR(100)
);

ETL流程实现

python
# ETL流程示例
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import logging

class DataWarehouseETL:
    def __init__(self, source_conn, target_conn):
        self.source_engine = create_engine(source_conn)
        self.target_engine = create_engine(target_conn)
        self.logger = logging.getLogger(__name__)
    
    def extract_sales_data(self, start_date, end_date):
        """
        从源系统提取销售数据
        """
        query = """
        SELECT 
            s.sale_id,
            s.product_id,
            s.customer_id,
            s.sale_date,
            s.store_id,
            s.quantity,
            s.unit_price,
            s.total_amount,
            s.discount_amount,
            p.product_name,
            p.category,
            p.brand,
            c.customer_name,
            c.city as customer_city,
            st.store_name,
            st.city as store_city
        FROM sales s
        JOIN products p ON s.product_id = p.product_id
        JOIN customers c ON s.customer_id = c.customer_id
        JOIN stores st ON s.store_id = st.store_id
        WHERE s.sale_date BETWEEN %s AND %s
        """
        
        df = pd.read_sql(query, self.source_engine, 
                        params=[start_date, end_date])
        self.logger.info(f"提取了 {len(df)} 条销售记录")
        return df
    
    def transform_sales_data(self, df):
        """
        转换销售数据
        """
        # 数据清洗
        df = df.dropna(subset=['sale_id', 'product_id', 'customer_id'])
        
        # 数据类型转换
        df['sale_date'] = pd.to_datetime(df['sale_date'])
        df['quantity'] = df['quantity'].astype(int)
        df['unit_price'] = df['unit_price'].astype(float)
        df['total_amount'] = df['total_amount'].astype(float)
        
        # 计算派生字段
        df['profit_amount'] = df['total_amount'] - df['discount_amount']
        df['year'] = df['sale_date'].dt.year
        df['month'] = df['sale_date'].dt.month
        df['quarter'] = df['sale_date'].dt.quarter
        
        # 数据验证
        invalid_records = df[df['total_amount'] < 0]
        if not invalid_records.empty:
            self.logger.warning(f"发现 {len(invalid_records)} 条无效记录")
            df = df[df['total_amount'] >= 0]
        
        self.logger.info(f"转换后保留 {len(df)} 条有效记录")
        return df
    
    def load_dimension_tables(self):
        """
        加载维度表
        """
        # 加载产品维度
        product_query = """
        SELECT DISTINCT
            ROW_NUMBER() OVER (ORDER BY product_id) as product_key,
            product_id,
            product_name,
            category,
            subcategory,
            brand,
            supplier,
            unit_cost,
            CURRENT_DATE as created_date
        FROM products
        """
        
        products_df = pd.read_sql(product_query, self.source_engine)
        products_df.to_sql('dim_product', self.target_engine, 
                          if_exists='replace', index=False)
        
        # 加载客户维度
        customer_query = """
        SELECT DISTINCT
            ROW_NUMBER() OVER (ORDER BY customer_id) as customer_key,
            customer_id,
            customer_name,
            gender,
            CASE 
                WHEN age < 25 THEN '18-24'
                WHEN age < 35 THEN '25-34'
                WHEN age < 45 THEN '35-44'
                WHEN age < 55 THEN '45-54'
                ELSE '55+'
            END as age_group,
            city,
            province,
            customer_segment,
            registration_date
        FROM customers
        """
        
        customers_df = pd.read_sql(customer_query, self.source_engine)
        customers_df.to_sql('dim_customer', self.target_engine, 
                           if_exists='replace', index=False)
        
        # 生成日期维度
        self.generate_date_dimension()
        
        self.logger.info("维度表加载完成")
    
    def generate_date_dimension(self):
        """
        生成日期维度表
        """
        start_date = datetime(2020, 1, 1)
        end_date = datetime(2025, 12, 31)
        
        dates = []
        current_date = start_date
        
        while current_date <= end_date:
            dates.append({
                'date_key': int(current_date.strftime('%Y%m%d')),
                'full_date': current_date,
                'year': current_date.year,
                'quarter': (current_date.month - 1) // 3 + 1,
                'month': current_date.month,
                'week': current_date.isocalendar()[1],
                'day_of_month': current_date.day,
                'day_of_week': current_date.weekday() + 1,
                'day_name': current_date.strftime('%A'),
                'month_name': current_date.strftime('%B'),
                'is_weekend': current_date.weekday() >= 5,
                'is_holiday': self.is_holiday(current_date)
            })
            current_date += timedelta(days=1)
        
        dates_df = pd.DataFrame(dates)
        dates_df.to_sql('dim_date', self.target_engine, 
                       if_exists='replace', index=False)
    
    def is_holiday(self, date):
        """
        判断是否为节假日(简化版本)
        """
        holidays = [
            (1, 1),   # 元旦
            (5, 1),   # 劳动节
            (10, 1),  # 国庆节
            (12, 25), # 圣诞节
        ]
        return (date.month, date.day) in holidays
    
    def load_fact_table(self, df):
        """
        加载事实表
        """
        # 获取维度键
        product_keys = pd.read_sql('SELECT product_id, product_key FROM dim_product', 
                                  self.target_engine)
        customer_keys = pd.read_sql('SELECT customer_id, customer_key FROM dim_customer', 
                                   self.target_engine)
        
        # 关联维度键
        df = df.merge(product_keys, on='product_id', how='left')
        df = df.merge(customer_keys, on='customer_id', how='left')
        
        # 生成日期键
        df['date_key'] = df['sale_date'].dt.strftime('%Y%m%d').astype(int)
        
        # 选择事实表字段
        fact_columns = [
            'sale_id', 'product_key', 'customer_key', 'date_key',
            'quantity', 'unit_price', 'total_amount', 'discount_amount'
        ]
        
        fact_df = df[fact_columns]
        fact_df.to_sql('fact_sales', self.target_engine, 
                      if_exists='append', index=False)
        
        self.logger.info(f"加载了 {len(fact_df)} 条事实记录")
    
    def run_etl(self, start_date, end_date):
        """
        运行完整的ETL流程
        """
        try:
            self.logger.info(f"开始ETL流程: {start_date}{end_date}")
            
            # Extract
            raw_data = self.extract_sales_data(start_date, end_date)
            
            # Transform
            clean_data = self.transform_sales_data(raw_data)
            
            # Load dimensions (只在首次运行时执行)
            self.load_dimension_tables()
            
            # Load facts
            self.load_fact_table(clean_data)
            
            self.logger.info("ETL流程完成")
            
        except Exception as e:
            self.logger.error(f"ETL流程失败: {str(e)}")
            raise

# 使用示例
if __name__ == "__main__":
    source_conn = "mysql://user:password@localhost/source_db"
    target_conn = "postgresql://user:password@localhost/warehouse_db"
    
    etl = DataWarehouseETL(source_conn, target_conn)
    etl.run_etl('2024-01-01', '2024-01-31')

数据湖(Data Lake)

核心概念

数据湖是一个存储大量原始数据的存储库,数据以其原生格式保存,直到需要时才进行处理。

特点

  • 模式灵活:支持结构化、半结构化和非结构化数据
  • 存储成本低:使用廉价的存储介质
  • 处理灵活:支持多种数据处理引擎
  • 扩展性强:可以水平扩展到PB级别

数据湖架构

┌─────────────────────────────────────────────────────────────────────────────┐
│                              数据湖架构                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                            数据消费层                                        │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ BI工具      │ │ 机器学习    │ │ 实时分析    │ │ 数据科学    │ │ API服务 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│                            数据处理层                                        │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ Spark       │ │ Flink       │ │ Presto      │ │ Hive        │ │ Kafka   │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│                            数据管理层                                        │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ 元数据管理  │ │ 数据目录    │ │ 数据血缘    │ │ 数据质量    │ │ 安全管理│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│                            数据存储层                                        │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │                          分布式文件系统                                  │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       │ │
│ │ │ 原始数据区  │ │ 清洗数据区  │ │ 建模数据区  │ │ 应用数据区  │       │ │
│ │ │ (Raw Zone)  │ │(Clean Zone) │ │(Model Zone) │ │ (App Zone)  │       │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘       │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│                            数据接入层                                        │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ 批量接入    │ │ 流式接入    │ │ API接入     │ │ 文件上传    │ │ 爬虫采集│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘

数据湖实现示例

python
# 基于Spark的数据湖处理示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import boto3
from datetime import datetime

class DataLakeProcessor:
    def __init__(self, app_name="DataLakeProcessor"):
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .getOrCreate()
        
        self.s3_client = boto3.client('s3')
        self.bucket_name = "my-data-lake"
    
    def ingest_raw_data(self, source_path, target_path, data_format="json"):
        """
        原始数据接入
        """
        try:
            # 读取原始数据
            if data_format == "json":
                df = self.spark.read.json(source_path)
            elif data_format == "csv":
                df = self.spark.read.option("header", "true").csv(source_path)
            elif data_format == "parquet":
                df = self.spark.read.parquet(source_path)
            else:
                raise ValueError(f"不支持的数据格式: {data_format}")
            
            # 添加元数据
            df = df.withColumn("ingestion_timestamp", current_timestamp()) \
                   .withColumn("ingestion_date", current_date()) \
                   .withColumn("source_file", lit(source_path))
            
            # 按日期分区存储
            df.write \
                .mode("append") \
                .partitionBy("ingestion_date") \
                .parquet(target_path)
            
            print(f"成功接入 {df.count()} 条记录到 {target_path}")
            return df
            
        except Exception as e:
            print(f"数据接入失败: {str(e)}")
            raise
    
    def clean_and_validate_data(self, raw_path, clean_path):
        """
        数据清洗和验证
        """
        # 读取原始数据
        df = self.spark.read.parquet(raw_path)
        
        # 数据清洗
        # 1. 去除重复记录
        df = df.dropDuplicates()
        
        # 2. 处理空值
        df = df.na.drop(subset=["id", "timestamp"])  # 关键字段不能为空
        df = df.na.fill({"category": "unknown", "amount": 0})  # 填充默认值
        
        # 3. 数据类型转换
        df = df.withColumn("amount", col("amount").cast("double")) \
               .withColumn("timestamp", to_timestamp(col("timestamp")))
        
        # 4. 数据验证
        # 验证金额范围
        df = df.filter(col("amount") >= 0)
        
        # 验证时间范围
        current_time = datetime.now()
        df = df.filter(col("timestamp") <= current_time)
        
        # 5. 添加数据质量标记
        df = df.withColumn("data_quality_score", 
                          when(col("amount").isNull() | col("category").isNull(), 0.5)
                          .otherwise(1.0))
        
        # 6. 添加清洗时间戳
        df = df.withColumn("cleaned_timestamp", current_timestamp())
        
        # 保存清洗后的数据
        df.write \
            .mode("overwrite") \
            .partitionBy("ingestion_date") \
            .parquet(clean_path)
        
        print(f"数据清洗完成,保留 {df.count()} 条有效记录")
        return df
    
    def create_data_model(self, clean_path, model_path):
        """
        创建数据模型
        """
        # 读取清洗后的数据
        df = self.spark.read.parquet(clean_path)
        
        # 创建聚合模型
        daily_summary = df.groupBy(
            date_format(col("timestamp"), "yyyy-MM-dd").alias("date"),
            col("category")
        ).agg(
            count("*").alias("transaction_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount"),
            max("amount").alias("max_amount"),
            min("amount").alias("min_amount")
        )
        
        # 添加计算字段
        daily_summary = daily_summary.withColumn(
            "amount_category",
            when(col("avg_amount") < 100, "low")
            .when(col("avg_amount") < 1000, "medium")
            .otherwise("high")
        )
        
        # 保存模型数据
        daily_summary.write \
            .mode("overwrite") \
            .partitionBy("date") \
            .parquet(f"{model_path}/daily_summary")
        
        # 创建用户行为模型
        user_behavior = df.groupBy("user_id").agg(
            count("*").alias("total_transactions"),
            sum("amount").alias("total_spent"),
            avg("amount").alias("avg_transaction_amount"),
            countDistinct("category").alias("category_diversity"),
            datediff(max("timestamp"), min("timestamp")).alias("active_days")
        )
        
        # 用户分群
        user_behavior = user_behavior.withColumn(
            "user_segment",
            when((col("total_spent") > 10000) & (col("total_transactions") > 100), "high_value")
            .when((col("total_spent") > 1000) & (col("total_transactions") > 20), "medium_value")
            .otherwise("low_value")
        )
        
        user_behavior.write \
            .mode("overwrite") \
            .parquet(f"{model_path}/user_behavior")
        
        print("数据模型创建完成")
        return daily_summary, user_behavior
    
    def create_application_views(self, model_path, app_path):
        """
        创建应用层视图
        """
        # 读取模型数据
        daily_summary = self.spark.read.parquet(f"{model_path}/daily_summary")
        user_behavior = self.spark.read.parquet(f"{model_path}/user_behavior")
        
        # 创建BI报表数据
        # 月度趋势报表
        monthly_trend = daily_summary.withColumn(
            "month", date_format(col("date"), "yyyy-MM")
        ).groupBy("month", "category").agg(
            sum("total_amount").alias("monthly_amount"),
            sum("transaction_count").alias("monthly_transactions")
        )
        
        monthly_trend.write \
            .mode("overwrite") \
            .parquet(f"{app_path}/monthly_trend")
        
        # 用户价值分析
        user_value_analysis = user_behavior.groupBy("user_segment").agg(
            count("*").alias("user_count"),
            avg("total_spent").alias("avg_spent_per_user"),
            sum("total_spent").alias("segment_total_value")
        )
        
        user_value_analysis.write \
            .mode("overwrite") \
            .parquet(f"{app_path}/user_value_analysis")
        
        print("应用层视图创建完成")
    
    def setup_data_catalog(self):
        """
        设置数据目录
        """
        # 注册表到Hive Metastore
        self.spark.sql("""
            CREATE DATABASE IF NOT EXISTS data_lake
            COMMENT '数据湖数据库'
            LOCATION 's3a://my-data-lake/warehouse/'
        """)
        
        # 注册原始数据表
        self.spark.sql("""
            CREATE TABLE IF NOT EXISTS data_lake.raw_transactions (
                id STRING,
                user_id STRING,
                amount DOUBLE,
                category STRING,
                timestamp TIMESTAMP,
                ingestion_timestamp TIMESTAMP,
                source_file STRING
            )
            USING PARQUET
            PARTITIONED BY (ingestion_date DATE)
            LOCATION 's3a://my-data-lake/raw/transactions/'
        """)
        
        # 注册清洗数据表
        self.spark.sql("""
            CREATE TABLE IF NOT EXISTS data_lake.clean_transactions (
                id STRING,
                user_id STRING,
                amount DOUBLE,
                category STRING,
                timestamp TIMESTAMP,
                data_quality_score DOUBLE,
                cleaned_timestamp TIMESTAMP
            )
            USING PARQUET
            PARTITIONED BY (ingestion_date DATE)
            LOCATION 's3a://my-data-lake/clean/transactions/'
        """)
        
        # 注册聚合模型表
        self.spark.sql("""
            CREATE TABLE IF NOT EXISTS data_lake.daily_summary (
                category STRING,
                transaction_count BIGINT,
                total_amount DOUBLE,
                avg_amount DOUBLE,
                max_amount DOUBLE,
                min_amount DOUBLE,
                amount_category STRING
            )
            USING PARQUET
            PARTITIONED BY (date STRING)
            LOCATION 's3a://my-data-lake/model/daily_summary/'
        """)
        
        print("数据目录设置完成")
    
    def run_data_pipeline(self, source_path, data_format="json"):
        """
        运行完整的数据管道
        """
        try:
            print("开始数据湖处理管道...")
            
            # 定义路径
            raw_path = f"s3a://{self.bucket_name}/raw/transactions/"
            clean_path = f"s3a://{self.bucket_name}/clean/transactions/"
            model_path = f"s3a://{self.bucket_name}/model/"
            app_path = f"s3a://{self.bucket_name}/app/"
            
            # 1. 数据接入
            print("步骤1: 数据接入")
            self.ingest_raw_data(source_path, raw_path, data_format)
            
            # 2. 数据清洗
            print("步骤2: 数据清洗")
            self.clean_and_validate_data(raw_path, clean_path)
            
            # 3. 数据建模
            print("步骤3: 数据建模")
            self.create_data_model(clean_path, model_path)
            
            # 4. 应用层处理
            print("步骤4: 应用层处理")
            self.create_application_views(model_path, app_path)
            
            # 5. 设置数据目录
            print("步骤5: 设置数据目录")
            self.setup_data_catalog()
            
            print("数据湖处理管道完成")
            
        except Exception as e:
            print(f"数据管道执行失败: {str(e)}")
            raise
    
    def close(self):
        """
        关闭Spark会话
        """
        self.spark.stop()

# 使用示例
if __name__ == "__main__":
    processor = DataLakeProcessor()
    
    try:
        # 运行数据管道
        processor.run_data_pipeline(
            source_path="s3a://source-bucket/transactions/2024/01/",
            data_format="json"
        )
        
        # 查询示例
        result = processor.spark.sql("""
            SELECT category, sum(total_amount) as total
            FROM data_lake.daily_summary
            WHERE date >= '2024-01-01'
            GROUP BY category
            ORDER BY total DESC
        """)
        
        result.show()
        
    finally:
        processor.close()

现代数据湖架构(Lakehouse)

Delta Lake架构

python
# Delta Lake示例
from delta.tables import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

class DeltaLakeManager:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("DeltaLakeManager") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()
    
    def create_delta_table(self, data_path, table_path):
        """
        创建Delta表
        """
        # 读取数据
        df = self.spark.read.parquet(data_path)
        
        # 写入Delta表
        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .save(table_path)
        
        print(f"Delta表创建完成: {table_path}")
    
    def upsert_data(self, table_path, new_data_path):
        """
        数据更新插入(Upsert)
        """
        # 加载Delta表
        delta_table = DeltaTable.forPath(self.spark, table_path)
        
        # 读取新数据
        new_data = self.spark.read.parquet(new_data_path)
        
        # 执行Merge操作
        delta_table.alias("target").merge(
            new_data.alias("source"),
            "target.id = source.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
        
        print("数据更新插入完成")
    
    def time_travel_query(self, table_path, version=None, timestamp=None):
        """
        时间旅行查询
        """
        if version is not None:
            df = self.spark.read.format("delta").option("versionAsOf", version).load(table_path)
            print(f"查询版本 {version} 的数据")
        elif timestamp is not None:
            df = self.spark.read.format("delta").option("timestampAsOf", timestamp).load(table_path)
            print(f"查询时间点 {timestamp} 的数据")
        else:
            df = self.spark.read.format("delta").load(table_path)
            print("查询最新版本的数据")
        
        return df
    
    def optimize_table(self, table_path):
        """
        表优化
        """
        delta_table = DeltaTable.forPath(self.spark, table_path)
        
        # 压缩小文件
        delta_table.optimize().executeCompaction()
        
        # Z-Order优化(按指定列排序)
        delta_table.optimize().executeZOrderBy("user_id", "timestamp")
        
        print("表优化完成")
    
    def vacuum_table(self, table_path, retention_hours=168):
        """
        清理旧版本文件
        """
        delta_table = DeltaTable.forPath(self.spark, table_path)
        delta_table.vacuum(retention_hours)
        
        print(f"清理 {retention_hours} 小时前的文件")

数据仓库 vs 数据湖对比

特性数据仓库数据湖
数据结构结构化数据结构化、半结构化、非结构化
模式写时模式(Schema on Write)读时模式(Schema on Read)
存储成本
查询性能高(预处理)中等(按需处理)
数据质量高(ETL保证)需要额外处理
灵活性低(固定模式)高(灵活模式)
处理延迟高(批处理)低(实时处理)
技术复杂度中等
适用场景BI报表、历史分析机器学习、实时分析

最佳实践

1. 数据仓库最佳实践

  • 维度建模:合理设计星型或雪花模型
  • ETL优化:增量更新、并行处理、错误处理
  • 性能调优:分区、索引、物化视图
  • 数据质量:数据验证、清洗规则、监控告警

2. 数据湖最佳实践

  • 分层架构:原始层、清洗层、建模层、应用层
  • 元数据管理:数据目录、血缘关系、质量评分
  • 安全控制:访问控制、数据加密、审计日志
  • 成本优化:生命周期管理、存储分层、计算优化

3. 混合架构(Lakehouse)

  • 统一存储:使用对象存储作为统一底座
  • ACID事务:支持数据一致性和并发控制
  • 实时处理:流批一体化处理能力
  • 多引擎支持:支持多种计算引擎访问

总结

数据仓库和数据湖各有优势,在实际应用中需要根据业务需求选择合适的架构。现代企业越来越倾向于采用Lakehouse架构,结合两者的优势,构建统一的数据平台。关键是要做好数据治理,确保数据质量和安全性,同时提供灵活的数据处理和分析能力。