Appearance
Apache Spark 详解
概述
Apache Spark 是一个开源的分布式计算框架,专为大规模数据处理而设计。它提供了比传统 MapReduce 更快的内存计算能力,支持批处理、流处理、机器学习和图计算等多种工作负载。
Spark 架构
核心架构组件
┌─────────────────────────────────────────────────────────┐
│ Spark 架构图 │
├─────────────────────────────────────────────────────────┤
│ │
│ Driver Program │
│ ┌─────────────────────────────────────────────────┐ │
│ │ SparkContext │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ DAG │ │ Task │ │ │
│ │ │ Scheduler │ │ Scheduler │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Cluster Manager │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Standalone / YARN / Mesos / Kubernetes │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Worker Nodes │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Executor │ │ Executor │ │ Executor │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ Task │ │ │ │ Task │ │ │ │ Task │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ Cache │ │ │ │ Cache │ │ │ │ Cache │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
组件说明
Driver Program
- 包含应用程序的 main 函数
- 创建 SparkContext
- 定义 RDD 和操作
- 调度任务到集群
SparkContext
- Spark 应用程序的入口点
- 连接到集群管理器
- 获取 Executor
- 发送应用代码到 Executor
Cluster Manager
- 负责资源分配
- 支持多种模式:Standalone、YARN、Mesos、Kubernetes
Executor
- 运行在 Worker 节点上的进程
- 执行任务
- 存储数据在内存或磁盘
核心概念
1. RDD (Resilient Distributed Dataset)
特性:
- 弹性:自动容错和恢复
- 分布式:数据分布在集群中
- 不可变:创建后不能修改
- 惰性求值:只有在 Action 操作时才计算
RDD 操作类型:
Transformation(转换操作)
scala// 示例 val rdd1 = sc.textFile("input.txt") val rdd2 = rdd1.filter(_.contains("error")) val rdd3 = rdd2.map(_.toUpperCase)
Action(行动操作)
scala// 示例 val count = rdd3.count() val results = rdd3.collect() rdd3.saveAsTextFile("output")
2. DataFrame 和 Dataset
DataFrame:
- 结构化数据抽象
- 类似关系型数据库的表
- 支持 SQL 查询
- 优化的执行引擎
scala
// DataFrame 示例
val df = spark.read.json("people.json")
df.select("name", "age").filter($"age" > 21).show()
// SQL 查询
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
Dataset:
- 类型安全的 DataFrame
- 编译时类型检查
- 结合了 RDD 和 DataFrame 的优点
scala
// Dataset 示例
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 21).map(_.name).show()
3. Spark SQL
特性:
- 支持标准 SQL
- 多数据源支持
- 查询优化器(Catalyst)
- 代码生成
数据源支持:
- JSON、Parquet、ORC
- Hive、JDBC
- Cassandra、HBase
- Kafka、Elasticsearch
scala
// 多数据源示例
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val parquetDF = spark.read.parquet("path/to/file.parquet")
Spark 生态系统
1. Spark Streaming
特性:
- 微批处理模型
- 容错性
- 与批处理 API 统一
scala
// Spark Streaming 示例
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
2. Structured Streaming
特性:
- 真正的流处理
- 事件时间处理
- 端到端一致性保证
scala
// Structured Streaming 示例
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
3. MLlib (Machine Learning)
功能模块:
- 分类和回归
- 聚类
- 协同过滤
- 特征提取和转换
- 模型评估
scala
// MLlib 示例
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
// 特征工程
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val featureDF = assembler.transform(rawDF)
// 训练模型
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val model = lr.fit(featureDF)
// 预测
val predictions = model.transform(testDF)
4. GraphX (图计算)
特性:
- 图数据结构
- 图算法库
- 图并行计算
scala
// GraphX 示例
import org.apache.spark.graphx._
// 创建图
val vertices = sc.parallelize(Array((1L, "Alice"), (2L, "Bob")))
val edges = sc.parallelize(Array(Edge(1L, 2L, "friend")))
val graph = Graph(vertices, edges)
// PageRank 算法
val ranks = graph.pageRank(0.0001).vertices
性能优化
1. 内存管理
存储级别:
scala
// 不同的存储级别
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.persist(StorageLevel.DISK_ONLY)
内存配置:
bash
# 执行内存配置
spark.executor.memory=4g
spark.executor.memoryFraction=0.8
spark.storage.memoryFraction=0.6
2. 并行度调优
scala
// 设置分区数
val rdd = sc.textFile("input.txt", minPartitions = 100)
// 重新分区
val repartitionedRDD = rdd.repartition(200)
val coalescedRDD = rdd.coalesce(50)
3. 数据倾斜处理
解决方案:
加盐技术
scala// 为 key 添加随机前缀 val saltedRDD = rdd.map { case (key, value) => val salt = Random.nextInt(100) (s"${salt}_${key}", value) }
预聚合
scala// 先进行局部聚合 val preAggregated = rdd.mapPartitions { iter => iter.groupBy(_._1).map { case (key, values) => (key, values.map(_._2).sum) }.iterator }
4. 广播变量和累加器
scala
// 广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3))
val result = rdd.map(x => x * broadcastVar.value.sum)
// 累加器
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
println(accum.value)
部署模式
1. Standalone 模式
bash
# 启动 Master
./sbin/start-master.sh
# 启动 Worker
./sbin/start-slave.sh spark://master:7077
# 提交应用
./bin/spark-submit \
--class MyApp \
--master spark://master:7077 \
--executor-memory 2g \
--total-executor-cores 4 \
myapp.jar
2. YARN 模式
bash
# Cluster 模式
./bin/spark-submit \
--class MyApp \
--master yarn \
--deploy-mode cluster \
--executor-memory 2g \
--num-executors 10 \
myapp.jar
# Client 模式
./bin/spark-submit \
--class MyApp \
--master yarn \
--deploy-mode client \
--executor-memory 2g \
--num-executors 10 \
myapp.jar
3. Kubernetes 模式
bash
# K8s 部署
./bin/spark-submit \
--master k8s://https://kubernetes-api-server:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:latest \
local:///opt/spark/examples/jars/spark-examples.jar
监控和调试
1. Spark UI
主要功能:
- Jobs 监控
- Stages 详情
- Storage 使用情况
- Executors 状态
- SQL 查询计划
访问地址:
- Driver UI: http://driver:4040
- History Server: http://history-server:18080
2. 日志配置
properties
# log4j.properties
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# 设置特定包的日志级别
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.spark_project.jetty=WARN
3. 性能调优工具
常用工具:
- Spark History Server
- Ganglia
- Grafana + Prometheus
- Dr. Elephant
最佳实践
1. 开发建议
选择合适的数据结构
- 结构化数据使用 DataFrame/Dataset
- 非结构化数据使用 RDD
避免 Shuffle 操作
scala// 避免 rdd.groupByKey().mapValues(_.sum) // 推荐 rdd.reduceByKey(_ + _)
合理使用缓存
scala// 多次使用的 RDD 进行缓存 val cachedRDD = rdd.filter(_.contains("error")).cache() cachedRDD.count() cachedRDD.collect()
2. 生产环境配置
bash
# 推荐配置
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.pyspark.enabled=true
3. 资源规划
内存规划:
- Executor 内存 = 堆内存 + 堆外内存
- 堆内存 = 存储内存 + 执行内存 + 其他内存
- 建议 Executor 内存不超过 64GB
CPU 规划:
- 每个 Executor 2-5 个 CPU 核心
- 避免单个 Executor 占用过多资源
总结
Apache Spark 作为现代大数据处理的核心框架,具有以下优势:
- 统一的计算引擎:支持批处理、流处理、机器学习和图计算
- 高性能:内存计算和优化的执行引擎
- 易用性:丰富的 API 和 SQL 支持
- 容错性:自动故障恢复机制
- 可扩展性:支持从单机到数千节点的集群
掌握 Spark 的核心概念和最佳实践,对于构建高效的大数据处理系统至关重要。随着技术的不断发展,Spark 也在持续演进,向着更加智能化和自动化的方向发展。