Appearance
Apache Flink 详解
概述
Apache Flink 是一个开源的分布式流处理框架,专为低延迟、高吞吐量的实时数据处理而设计。Flink 提供了真正的流处理能力,支持事件时间处理、状态管理和精确一次语义保证。
Flink 架构
核心架构组件
┌─────────────────────────────────────────────────────────┐
│ Flink 架构图 │
├─────────────────────────────────────────────────────────┤
│ │
│ Client │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Flink Program │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ DataStream │ │ Table │ │ │
│ │ │ API │ │ API │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ JobManager (Master) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Dispatcher │ │ ResourceMgr │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ JobMaster │ │ │
│ │ │ ┌───────────┐ ┌───────────────────┐ │ │ │
│ │ │ │ Scheduler │ │ Checkpoint │ │ │ │
│ │ │ │ │ │ Coordinator │ │ │ │
│ │ │ └───────────┘ └───────────────────┘ │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ TaskManagers (Workers) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TaskManager │ │ TaskManager │ │ TaskManager │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ Task │ │ │ │ Task │ │ │ │ Task │ │ │
│ │ │ Slot │ │ │ │ Slot │ │ │ │ Slot │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ State │ │ │ │ State │ │ │ │ State │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
组件说明
JobManager
- 协调分布式执行
- 调度任务
- 管理检查点
- 故障恢复
TaskManager
- 执行任务
- 管理内存和网络缓冲区
- 向 JobManager 报告状态
Client
- 提交作业
- 获取作业结果
- 提供用户接口
核心概念
1. DataStream API
基本操作:
java
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据源
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 转换操作
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// 输出
wordCounts.print();
// 执行
env.execute("Word Count Example");
转换操作类型:
Map
javaDataStream<Integer> doubled = stream.map(x -> x * 2);
Filter
javaDataStream<Integer> filtered = stream.filter(x -> x > 0);
KeyBy
javaKeyedStream<Tuple2<String, Integer>, String> keyed = stream.keyBy(value -> value.f0);
Window
javaWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(10)));
2. 时间语义
时间类型:
处理时间(Processing Time)
- 机器系统时间
- 低延迟
- 不确定性结果
事件时间(Event Time)
- 事件发生的时间
- 确定性结果
- 需要处理乱序数据
摄取时间(Ingestion Time)
- 数据进入 Flink 的时间
- 介于处理时间和事件时间之间
时间配置:
java
// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置水印策略
DataStream<MyEvent> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
3. 窗口(Windows)
窗口类型:
滚动窗口(Tumbling Windows)
javastream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .sum(1);
滑动窗口(Sliding Windows)
javastream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .sum(1);
会话窗口(Session Windows)
javastream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .sum(1);
全局窗口(Global Windows)
javastream.keyBy(...) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1000)) .sum(1);
4. 状态管理
状态类型:
Keyed State
javapublic class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private transient ValueState<Tuple2<Long, Long>> sum; @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); sum = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { Tuple2<Long, Long> currentSum = sum.value(); if (currentSum == null) { currentSum = Tuple2.of(0L, 0L); } currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); if (currentSum.f0 >= 2) { out.collect(Tuple2.of(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } }
Operator State
javapublic class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } } }
5. 检查点和保存点
检查点配置:
java
// 启用检查点
env.enableCheckpointing(5000); // 每5秒检查点
// 检查点配置
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
保存点操作:
bash
# 触发保存点
flink savepoint <jobId> [targetDirectory]
# 从保存点恢复
flink run -s <savepointPath> <jarFile>
# 取消作业并创建保存点
flink cancel -s [targetDirectory] <jobId>
Table API 和 SQL
1. Table API
java
// 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 从 DataStream 创建表
DataStream<Tuple2<String, Integer>> stream = ...;
Table table = tableEnv.fromDataStream(stream, $"word", $"count");
// Table API 操作
Table result = table
.where($"count".isGreater(5))
.groupBy($"word")
.select($"word", $"count".sum().as("total"));
// 转换回 DataStream
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(result, Row.class);
2. SQL 查询
java
// 注册表
tableEnv.createTemporaryView("WordCount", table);
// SQL 查询
Table sqlResult = tableEnv.sqlQuery(
"SELECT word, SUM(count) as total " +
"FROM WordCount " +
"WHERE count > 5 " +
"GROUP BY word"
);
// 连接器配置
tableEnv.executeSql(
"CREATE TABLE kafka_source (" +
" user_id BIGINT," +
" item_id BIGINT," +
" category_id BIGINT," +
" behavior STRING," +
" ts TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user_behavior'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
连接器生态
1. 数据源连接器
Kafka 连接器:
java
// Kafka 源
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"my-topic",
new SimpleStringSchema(),
properties
);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
// Kafka 汇
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
"my-topic",
new SimpleStringSchema(),
properties
);
stream.addSink(producer);
文件系统连接器:
java
// 文件源
DataStream<String> textStream = env.readTextFile("path/to/file");
// 文件汇
stream.writeAsText("path/to/output");
数据库连接器:
java
// JDBC 汇
JdbcSink.sink(
"INSERT INTO my_table (id, name) VALUES (?, ?)",
(statement, user) -> {
statement.setLong(1, user.getId());
statement.setString(2, user.getName());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("username")
.withPassword("password")
.build()
);
2. 格式支持
JSON 格式:
java
// JSON 反序列化
DataStream<ObjectNode> jsonStream = env
.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false), properties));
Avro 格式:
java
// Avro 反序列化
DataStream<GenericRecord> avroStream = env
.addSource(new FlinkKafkaConsumer<>("topic", AvroDeserializationSchema.forGeneric(schema), properties));
部署模式
1. Standalone 集群
bash
# 启动集群
./bin/start-cluster.sh
# 提交作业
./bin/flink run examples/streaming/WordCount.jar
# 停止集群
./bin/stop-cluster.sh
2. YARN 部署
bash
# Session 模式
./bin/yarn-session.sh -n 2 -jm 1024m -tm 4096m
./bin/flink run examples/streaming/WordCount.jar
# Per-Job 模式
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024m -ytm 4096m examples/streaming/WordCount.jar
# Application 模式
./bin/flink run-application -t yarn-application examples/streaming/WordCount.jar
3. Kubernetes 部署
bash
# Session 模式
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=flink:latest \
-Djobmanager.memory.process.size=1024m \
-Dkubernetes.jobmanager.cpu=1 \
-Dtaskmanager.memory.process.size=1024m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=4
# Application 模式
./bin/flink run-application \
-t kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-with-job:latest \
local:///opt/flink/usrlib/my-flink-job.jar
性能优化
1. 并行度调优
java
// 全局并行度
env.setParallelism(4);
// 算子级别并行度
stream.map(new MyMapFunction()).setParallelism(2);
// 禁用算子链
stream.map(new MyMapFunction()).disableChaining();
// 开始新的算子链
stream.map(new MyMapFunction()).startNewChain();
2. 内存配置
bash
# JobManager 内存配置
jobmanager.memory.process.size: 1600m
jobmanager.memory.jvm-heap.size: 1024m
# TaskManager 内存配置
taskmanager.memory.process.size: 1728m
taskmanager.memory.task.heap.size: 512m
taskmanager.memory.managed.size: 512m
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 64mb
3. 检查点优化
java
// 增量检查点
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 检查点压缩
Configuration config = new Configuration();
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs://namenode:port/flink-checkpoints");
config.setBoolean(CheckpointingOptions.COMPRESS_SNAPSHOTS, true);
4. 状态后端配置
java
// RocksDB 状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 配置 RocksDB
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://namenode:port/flink-checkpoints", true);
rocksDBStateBackend.setDbStoragePath("/tmp/flink/rocksdb");
env.setStateBackend(rocksDBStateBackend);
监控和运维
1. Web UI
主要功能:
- 作业概览
- 任务详情
- 检查点监控
- 背压监控
- 配置信息
访问地址:
- JobManager UI: http://jobmanager:8081
2. 指标系统
java
// 自定义指标
public class MyMapFunction extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value.toUpperCase();
}
}
指标配置:
yaml
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
3. 日志配置
xml
<!-- log4j2.xml -->
<Configuration>
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"/>
</Console>
<RollingFile name="rolling" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>
<Loggers>
<Logger name="org.apache.flink" level="INFO"/>
<Logger name="org.apache.kafka" level="INFO"/>
<Root level="INFO">
<AppenderRef ref="console"/>
<AppenderRef ref="rolling"/>
</Root>
</Loggers>
</Configuration>
最佳实践
1. 开发建议
选择合适的时间语义
- 对结果准确性要求高:使用事件时间
- 对延迟要求高:使用处理时间
合理设置水印
java// 允许5秒的乱序 WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
状态管理优化
- 及时清理过期状态
- 使用 TTL 配置
javaStateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(86400)) // 1天TTL .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class); stateDescriptor.enableTimeToLive(ttlConfig);
2. 生产环境配置
yaml
# 推荐配置
jobmanager.execution.failover-strategy: region
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10s
restart-strategy.exponential-delay.max-backoff: 2min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10min
restart-strategy.exponential-delay.jitter-factor: 0.1
# 网络配置
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
# 检查点配置
state.checkpoints.dir: hdfs://namenode:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode:port/flink-savepoints
state.backend.incremental: true
3. 故障排查
常见问题:
背压问题
- 检查算子处理能力
- 增加并行度
- 优化算子逻辑
检查点失败
- 检查存储系统
- 调整检查点超时时间
- 优化状态大小
内存溢出
- 调整内存配置
- 优化状态使用
- 使用 RocksDB 状态后端
与其他系统对比
Flink vs Spark Streaming
特性 | Flink | Spark Streaming |
---|---|---|
处理模型 | 真正的流处理 | 微批处理 |
延迟 | 毫秒级 | 秒级 |
吞吐量 | 高 | 非常高 |
状态管理 | 原生支持 | 有限支持 |
容错机制 | 检查点 | RDD 血统 |
事件时间 | 完整支持 | 有限支持 |
Flink vs Storm
特性 | Flink | Storm |
---|---|---|
编程模型 | DataStream API | Topology |
状态管理 | 内置支持 | 需要外部存储 |
容错机制 | 检查点 | 记录级确认 |
性能 | 高吞吐量 | 低延迟 |
易用性 | 较好 | 复杂 |
总结
Apache Flink 作为新一代流处理框架,具有以下核心优势:
- 真正的流处理:原生流处理架构,低延迟高吞吐
- 强大的状态管理:内置状态后端,支持大规模状态
- 精确一次语义:端到端的一致性保证
- 事件时间处理:完整的乱序数据处理能力
- 丰富的连接器:支持多种数据源和目标系统
- 统一的批流处理:同一套 API 处理批量和流式数据
Flink 特别适合以下场景:
- 实时数据分析
- 事件驱动应用
- 复杂事件处理
- 实时机器学习
- 数据管道构建
随着实时计算需求的不断增长,Flink 已经成为流处理领域的首选框架,其生态系统也在不断完善和发展。