Appearance
Apache Kafka 详解
概述
Apache Kafka 是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目。Kafka被设计为一个高吞吐量、低延迟的分布式消息系统,能够处理大规模的实时数据流。
核心特性
- 高吞吐量:单个Kafka集群可以处理数万亿条消息
- 低延迟:消息延迟可以低至几毫秒
- 持久化:消息持久化存储在磁盘上,支持数据重放
- 分布式:天然支持分布式架构,具备高可用性
- 容错性:通过副本机制保证数据不丢失
- 可扩展性:支持水平扩展,可以动态增加节点
核心概念
1. Topic(主题)
java
// Topic是消息的逻辑分类
public class TopicExample {
/**
* Topic特性:
* - 消息的逻辑分组
* - 可以有多个分区
* - 支持多个生产者和消费者
*/
public void createTopic() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 创建Topic
NewTopic newTopic = new NewTopic("user-events", 3, (short) 2);
// 设置Topic配置
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "604800000"); // 7天保留期
configs.put("compression.type", "lz4"); // 压缩类型
configs.put("cleanup.policy", "delete"); // 清理策略
newTopic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
result.all().get(); // 等待创建完成
System.out.println("Topic创建成功");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询Topic信息
*/
public void describeTopic() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("user-events"));
result.all().get().forEach((topicName, description) -> {
System.out.println("Topic: " + topicName);
System.out.println("分区数: " + description.partitions().size());
description.partitions().forEach(partition -> {
System.out.println("分区 " + partition.partition() +
", Leader: " + partition.leader().id() +
", 副本: " + partition.replicas().stream()
.map(node -> String.valueOf(node.id()))
.collect(Collectors.joining(",")));
});
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. Partition(分区)
java
// 分区是Topic的物理分割
public class PartitionExample {
/**
* 分区特性:
* - 每个分区是一个有序的消息序列
* - 分区内消息有序,分区间无序
* - 每个分区可以有多个副本
* - 消费者数量不能超过分区数量
*/
/**
* 自定义分区器
*/
public static class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
// 轮询分配
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// 基于key的hash分区
if (key instanceof String) {
String stringKey = (String) key;
// 特殊业务逻辑:VIP用户消息发送到特定分区
if (stringKey.startsWith("VIP_")) {
return 0; // VIP用户消息都发送到分区0
}
// 普通用户按hash分区
return Math.abs(stringKey.hashCode()) % numPartitions;
}
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void close() {
// 清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 配置初始化
}
}
/**
* 分区重平衡监听器
*/
public static class RebalanceListener implements ConsumerRebalanceListener {
private final Consumer<String, String> consumer;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public RebalanceListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("分区被撤销: " + partitions);
// 提交当前偏移量
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("分区被分配: " + partitions);
// 可以在这里设置特定的偏移量
for (TopicPartition partition : partitions) {
// 从最新位置开始消费
consumer.seekToEnd(Arrays.asList(partition));
}
}
public void addOffset(TopicPartition partition, OffsetAndMetadata offset) {
currentOffsets.put(partition, offset);
}
}
}
3. Producer(生产者)
java
// 生产者负责发送消息到Kafka
public class ProducerExample {
private KafkaProducer<String, String> producer;
public ProducerExample() {
Properties props = new Properties();
// 基础配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 性能优化配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 压缩类型
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确认级别
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 重试间隔
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 请求超时
// 幂等性配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionExample.CustomPartitioner.class.getName());
this.producer = new KafkaProducer<>(props);
}
/**
* 同步发送消息
*/
public void sendSync(String topic, String key, String value) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// 添加消息头
record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
record.headers().add("source", "user-service".getBytes());
RecordMetadata metadata = producer.send(record).get();
System.out.printf("消息发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (Exception e) {
System.err.println("消息发送失败: " + e.getMessage());
}
}
/**
* 异步发送消息
*/
public void sendAsync(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("消息发送失败: " + exception.getMessage());
// 可以实现重试逻辑或者记录失败消息
handleSendFailure(record, exception);
} else {
System.out.printf("消息发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
/**
* 批量发送消息
*/
public void sendBatch(String topic, List<String> messages) {
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (int i = 0; i < messages.size(); i++) {
String key = "key-" + i;
String value = messages.get(i);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
futures.add(future);
}
// 等待所有消息发送完成
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata metadata = future.get(10, TimeUnit.SECONDS);
System.out.printf("批量消息发送成功: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (Exception e) {
System.err.println("批量消息发送失败: " + e.getMessage());
}
}
}
/**
* 事务性发送
*/
public void sendTransactional(String topic, List<String> messages) {
// 配置事务ID
Properties props = new Properties();
props.putAll(producer.configs());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
KafkaProducer<String, String> transactionalProducer = new KafkaProducer<>(props);
try {
// 初始化事务
transactionalProducer.initTransactions();
// 开始事务
transactionalProducer.beginTransaction();
// 发送消息
for (int i = 0; i < messages.size(); i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "key-" + i, messages.get(i));
transactionalProducer.send(record);
}
// 提交事务
transactionalProducer.commitTransaction();
System.out.println("事务提交成功");
} catch (Exception e) {
// 回滚事务
transactionalProducer.abortTransaction();
System.err.println("事务回滚: " + e.getMessage());
} finally {
transactionalProducer.close();
}
}
private void handleSendFailure(ProducerRecord<String, String> record, Exception exception) {
// 实现失败处理逻辑
// 例如:记录到死信队列、重试、告警等
System.err.println("处理发送失败的消息: " + record.value());
}
public void close() {
producer.close();
}
}
4. Consumer(消费者)
java
// 消费者负责从Kafka读取消息
public class ConsumerExample {
private KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public ConsumerExample(String groupId) {
Properties props = new Properties();
// 基础配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费策略配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始消费
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交偏移量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 自动提交间隔
// 性能配置
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取字节数
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待时间
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 单次拉取最大记录数
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 会话超时时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 心跳间隔
this.consumer = new KafkaConsumer<>(props);
}
/**
* 基本消费模式
*/
public void basicConsume(String topic) {
consumer.subscribe(Arrays.asList(topic));
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 处理消息
processMessage(record);
}
// 手动提交偏移量
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("消费异常: " + e.getMessage());
} finally {
consumer.close();
}
}
/**
* 批量消费模式
*/
public void batchConsume(String topic, int batchSize) {
consumer.subscribe(Arrays.asList(topic));
List<ConsumerRecord<String, String>> batch = new ArrayList<>();
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
batch.add(record);
if (batch.size() >= batchSize) {
processBatch(batch);
// 提交批次中最后一条消息的偏移量
ConsumerRecord<String, String> lastRecord = batch.get(batch.size() - 1);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition(lastRecord.topic(), lastRecord.partition()),
new OffsetAndMetadata(lastRecord.offset() + 1)
);
consumer.commitSync(offsets);
batch.clear();
}
}
}
// 处理剩余的消息
if (!batch.isEmpty()) {
processBatch(batch);
}
} catch (Exception e) {
System.err.println("批量消费异常: " + e.getMessage());
} finally {
consumer.close();
}
}
/**
* 指定分区消费
*/
public void consumeSpecificPartitions(String topic, List<Integer> partitions) {
List<TopicPartition> topicPartitions = partitions.stream()
.map(partition -> new TopicPartition(topic, partition))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
// 从特定偏移量开始消费
for (TopicPartition partition : topicPartitions) {
consumer.seek(partition, 100); // 从偏移量100开始
}
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("指定分区消费: partition=%d, offset=%d, value=%s%n",
record.partition(), record.offset(), record.value());
processMessage(record);
}
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("指定分区消费异常: " + e.getMessage());
} finally {
consumer.close();
}
}
/**
* 消费者组重平衡
*/
public void consumeWithRebalance(String topic) {
PartitionExample.RebalanceListener rebalanceListener =
new PartitionExample.RebalanceListener(consumer);
consumer.subscribe(Arrays.asList(topic), rebalanceListener);
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
// 记录偏移量用于重平衡时提交
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
rebalanceListener.addOffset(partition, offset);
}
consumer.commitAsync();
}
} catch (Exception e) {
System.err.println("重平衡消费异常: " + e.getMessage());
} finally {
consumer.close();
}
}
private void processMessage(ConsumerRecord<String, String> record) {
try {
// 模拟消息处理
Thread.sleep(10);
// 检查消息头
record.headers().forEach(header -> {
System.out.printf("消息头: %s = %s%n",
header.key(), new String(header.value()));
});
// 业务逻辑处理
handleBusinessLogic(record.value());
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
// 可以实现重试或死信队列逻辑
}
}
private void processBatch(List<ConsumerRecord<String, String>> batch) {
System.out.println("批量处理 " + batch.size() + " 条消息");
// 批量业务处理
List<String> values = batch.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
handleBatchBusinessLogic(values);
}
private void handleBusinessLogic(String message) {
// 实现具体的业务逻辑
System.out.println("处理业务消息: " + message);
}
private void handleBatchBusinessLogic(List<String> messages) {
// 实现批量业务逻辑
System.out.println("批量处理业务消息: " + messages.size() + " 条");
}
public void shutdown() {
running.set(false);
}
}
Kafka架构
1. 集群架构
yaml
# Kafka集群架构配置
kafka-cluster:
brokers:
- id: 1
host: kafka-1.example.com
port: 9092
log.dirs: /var/kafka-logs-1
- id: 2
host: kafka-2.example.com
port: 9092
log.dirs: /var/kafka-logs-2
- id: 3
host: kafka-3.example.com
port: 9092
log.dirs: /var/kafka-logs-3
zookeeper:
ensemble:
- zk-1.example.com:2181
- zk-2.example.com:2181
- zk-3.example.com:2181
replication:
default.replication.factor: 3
min.insync.replicas: 2
performance:
num.network.threads: 8
num.io.threads: 16
socket.send.buffer.bytes: 102400
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
2. 存储机制
java
// Kafka存储机制说明
public class KafkaStorageExample {
/**
* Kafka存储结构:
*
* /var/kafka-logs/
* ├── topic-partition-0/
* │ ├── 00000000000000000000.log # 日志段文件
* │ ├── 00000000000000000000.index # 偏移量索引
* │ ├── 00000000000000000000.timeindex # 时间戳索引
* │ ├── 00000000000000001000.log
* │ ├── 00000000000000001000.index
* │ └── leader-epoch-checkpoint # Leader纪元检查点
* └── topic-partition-1/
* ├── 00000000000000000000.log
* └── ...
*/
/**
* 日志段管理
*/
public void logSegmentManagement() {
// 日志段配置
Properties props = new Properties();
// 日志段大小配置
props.put("log.segment.bytes", "1073741824"); // 1GB
props.put("log.segment.ms", "604800000"); // 7天
// 日志保留配置
props.put("log.retention.hours", "168"); // 7天
props.put("log.retention.bytes", "1073741824"); // 1GB
// 日志清理配置
props.put("log.cleanup.policy", "delete"); // 删除策略
props.put("log.cleaner.enable", "true"); // 启用日志清理
// 压缩配置
props.put("compression.type", "lz4"); // 压缩类型
System.out.println("日志段管理配置完成");
}
/**
* 索引机制
*/
public void indexMechanism() {
/**
* Kafka使用两种索引:
*
* 1. 偏移量索引(.index):
* - 稀疏索引,不是每条消息都有索引项
* - 索引项格式:(相对偏移量, 物理位置)
* - 用于快速定位消息在日志文件中的位置
*
* 2. 时间戳索引(.timeindex):
* - 基于时间戳的索引
* - 索引项格式:(时间戳, 相对偏移量)
* - 用于基于时间的消息查找
*/
// 索引配置
Properties indexProps = new Properties();
indexProps.put("log.index.interval.bytes", "4096"); // 索引间隔
indexProps.put("log.index.size.max.bytes", "10485760"); // 索引文件最大大小
System.out.println("索引机制配置完成");
}
}
性能优化
1. 生产者优化
java
// 生产者性能优化
public class ProducerOptimization {
/**
* 高性能生产者配置
*/
public KafkaProducer<String, String> createHighPerformanceProducer() {
Properties props = new Properties();
// 基础配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 批处理优化
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 等待20ms收集更多消息
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB缓冲区
// 压缩优化
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 使用LZ4压缩
// 网络优化
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB发送缓冲区
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 65536); // 64KB接收缓冲区
// 并发优化
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 可靠性与性能平衡
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 只等待Leader确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
return new KafkaProducer<>(props);
}
/**
* 异步批量发送优化
*/
public void optimizedAsyncSend() {
KafkaProducer<String, String> producer = createHighPerformanceProducer();
// 使用线程池进行异步发送
ExecutorService executor = Executors.newFixedThreadPool(10);
List<String> messages = generateMessages(10000);
// 分批异步发送
int batchSize = 100;
for (int i = 0; i < messages.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, messages.size());
List<String> batch = messages.subList(i, endIndex);
executor.submit(() -> {
for (String message : batch) {
ProducerRecord<String, String> record =
new ProducerRecord<>("high-throughput-topic", message);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
}
});
}
});
}
executor.shutdown();
producer.close();
}
private List<String> generateMessages(int count) {
List<String> messages = new ArrayList<>();
for (int i = 0; i < count; i++) {
messages.add("Message-" + i + "-" + System.currentTimeMillis());
}
return messages;
}
}
2. 消费者优化
java
// 消费者性能优化
public class ConsumerOptimization {
/**
* 高性能消费者配置
*/
public KafkaConsumer<String, String> createHighPerformanceConsumer(String groupId) {
Properties props = new Properties();
// 基础配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 拉取优化
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000); // 50KB最小拉取
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待500ms
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 单次最多1000条
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB最大拉取
// 网络优化
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144); // 256KB接收缓冲区
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB发送缓冲区
// 会话管理
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30s会话超时
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10s心跳间隔
// 偏移量管理
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaConsumer<>(props);
}
/**
* 多线程消费优化
*/
public void multiThreadConsume(String topic, int threadCount) {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executor.submit(() -> {
KafkaConsumer<String, String> consumer =
createHighPerformanceConsumer("high-perf-group-" + threadId);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
// 并行处理消息
records.forEach(record -> {
processMessageAsync(record);
});
// 批量提交偏移量
consumer.commitAsync();
}
}
} catch (Exception e) {
System.err.println("消费线程 " + threadId + " 异常: " + e.getMessage());
} finally {
consumer.close();
}
});
}
}
/**
* 流水线消费模式
*/
public void pipelineConsume(String topic) {
KafkaConsumer<String, String> consumer = createHighPerformanceConsumer("pipeline-group");
consumer.subscribe(Arrays.asList(topic));
// 创建处理队列
BlockingQueue<ConsumerRecord<String, String>> processingQueue =
new LinkedBlockingQueue<>(10000);
// 启动处理线程池
ExecutorService processingPool = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
processingPool.submit(() -> {
while (true) {
try {
ConsumerRecord<String, String> record = processingQueue.take();
processMessage(record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
// 消费线程
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 非阻塞放入处理队列
if (!processingQueue.offer(record)) {
System.err.println("处理队列已满,丢弃消息");
}
}
// 定期提交偏移量
consumer.commitAsync();
}
} finally {
consumer.close();
processingPool.shutdown();
}
}
private void processMessageAsync(ConsumerRecord<String, String> record) {
// 异步处理消息
CompletableFuture.runAsync(() -> {
processMessage(record);
});
}
private void processMessage(ConsumerRecord<String, String> record) {
// 模拟消息处理
try {
Thread.sleep(1); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
监控与运维
1. 监控指标
java
// Kafka监控指标收集
public class KafkaMonitoring {
private final MeterRegistry meterRegistry;
public KafkaMonitoring(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
/**
* 生产者监控
*/
public void monitorProducer(KafkaProducer<String, String> producer) {
// 获取生产者指标
Map<MetricName, ? extends Metric> metrics = producer.metrics();
// 关键指标监控
metrics.forEach((metricName, metric) -> {
String name = metricName.name();
switch (name) {
case "record-send-rate":
// 消息发送速率
meterRegistry.gauge("kafka.producer.record.send.rate", metric.metricValue());
break;
case "record-error-rate":
// 消息错误率
meterRegistry.gauge("kafka.producer.record.error.rate", metric.metricValue());
break;
case "request-latency-avg":
// 平均请求延迟
meterRegistry.gauge("kafka.producer.request.latency.avg", metric.metricValue());
break;
case "buffer-available-bytes":
// 可用缓冲区大小
meterRegistry.gauge("kafka.producer.buffer.available.bytes", metric.metricValue());
break;
case "batch-size-avg":
// 平均批次大小
meterRegistry.gauge("kafka.producer.batch.size.avg", metric.metricValue());
break;
}
});
}
/**
* 消费者监控
*/
public void monitorConsumer(KafkaConsumer<String, String> consumer) {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
metrics.forEach((metricName, metric) -> {
String name = metricName.name();
switch (name) {
case "records-consumed-rate":
// 消息消费速率
meterRegistry.gauge("kafka.consumer.records.consumed.rate", metric.metricValue());
break;
case "fetch-latency-avg":
// 平均拉取延迟
meterRegistry.gauge("kafka.consumer.fetch.latency.avg", metric.metricValue());
break;
case "records-lag-max":
// 最大消费滞后
meterRegistry.gauge("kafka.consumer.records.lag.max", metric.metricValue());
break;
case "fetch-size-avg":
// 平均拉取大小
meterRegistry.gauge("kafka.consumer.fetch.size.avg", metric.metricValue());
break;
}
});
}
/**
* 集群监控
*/
public void monitorCluster() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 监控Topic信息
ListTopicsResult topicsResult = adminClient.listTopics();
Set<String> topics = topicsResult.names().get();
meterRegistry.gauge("kafka.cluster.topics.count", topics.size());
// 监控每个Topic的分区信息
for (String topic : topics) {
DescribeTopicsResult describeResult =
adminClient.describeTopics(Arrays.asList(topic));
TopicDescription description = describeResult.all().get().get(topic);
int partitionCount = description.partitions().size();
meterRegistry.gauge("kafka.topic.partitions.count",
Tags.of("topic", topic), partitionCount);
// 监控副本状态
for (TopicPartitionInfo partition : description.partitions()) {
int replicaCount = partition.replicas().size();
int isrCount = partition.isr().size();
meterRegistry.gauge("kafka.partition.replicas.count",
Tags.of("topic", topic, "partition", String.valueOf(partition.partition())),
replicaCount);
meterRegistry.gauge("kafka.partition.isr.count",
Tags.of("topic", topic, "partition", String.valueOf(partition.partition())),
isrCount);
}
}
} catch (Exception e) {
System.err.println("集群监控异常: " + e.getMessage());
}
}
/**
* 消费者组监控
*/
public void monitorConsumerGroups() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 获取所有消费者组
ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> groups = groupsResult.all().get();
for (ConsumerGroupListing group : groups) {
String groupId = group.groupId();
// 获取消费者组详情
DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(Arrays.asList(groupId));
ConsumerGroupDescription description =
describeResult.all().get().get(groupId);
// 监控消费者组成员数量
int memberCount = description.members().size();
meterRegistry.gauge("kafka.consumer.group.members.count",
Tags.of("group", groupId), memberCount);
// 监控消费者组状态
String state = description.state().toString();
meterRegistry.gauge("kafka.consumer.group.state",
Tags.of("group", groupId, "state", state), 1);
// 监控消费滞后
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition partition = entry.getKey();
long consumerOffset = entry.getValue().offset();
// 获取分区的最新偏移量
// 这里需要额外的逻辑来获取最新偏移量并计算滞后
meterRegistry.gauge("kafka.consumer.group.offset",
Tags.of("group", groupId,
"topic", partition.topic(),
"partition", String.valueOf(partition.partition())),
consumerOffset);
}
}
} catch (Exception e) {
System.err.println("消费者组监控异常: " + e.getMessage());
}
}
}
2. 运维脚本
bash
#!/bin/bash
# Kafka运维管理脚本
KAFKA_HOME="/opt/kafka"
KAFKA_CONFIG="$KAFKA_HOME/config/server.properties"
ZOOKEEPER_CONNECT="localhost:2181"
BOOTSTRAP_SERVERS="localhost:9092"
# 1. 集群健康检查
check_cluster_health() {
echo "检查Kafka集群健康状态..."
# 检查Kafka进程
if pgrep -f "kafka.Kafka" > /dev/null; then
echo "✓ Kafka进程运行正常"
else
echo "✗ Kafka进程未运行"
return 1
fi
# 检查ZooKeeper连接
if $KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVERS > /dev/null 2>&1; then
echo "✓ Kafka集群连接正常"
else
echo "✗ Kafka集群连接异常"
return 1
fi
# 检查Topic列表
topic_count=$($KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | wc -l)
echo "✓ 当前Topic数量: $topic_count"
return 0
}
# 2. Topic管理
manage_topics() {
echo "Topic管理操作..."
# 创建Topic
create_topic() {
local topic_name=$1
local partitions=${2:-3}
local replication_factor=${3:-2}
echo "创建Topic: $topic_name (分区:$partitions, 副本:$replication_factor)"
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--create \
--topic $topic_name \
--partitions $partitions \
--replication-factor $replication_factor \
--config retention.ms=604800000 \
--config compression.type=lz4
}
# 删除Topic
delete_topic() {
local topic_name=$1
echo "删除Topic: $topic_name"
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--delete \
--topic $topic_name
}
# 修改Topic配置
modify_topic() {
local topic_name=$1
local config_key=$2
local config_value=$3
echo "修改Topic配置: $topic_name ($config_key=$config_value)"
$KAFKA_HOME/bin/kafka-configs.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--entity-type topics \
--entity-name $topic_name \
--alter \
--add-config $config_key=$config_value
}
# 增加分区
increase_partitions() {
local topic_name=$1
local new_partition_count=$2
echo "增加Topic分区: $topic_name (新分区数:$new_partition_count)"
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--alter \
--topic $topic_name \
--partitions $new_partition_count
}
}
# 3. 消费者组管理
manage_consumer_groups() {
echo "消费者组管理..."
# 列出所有消费者组
list_consumer_groups() {
echo "当前消费者组列表:"
$KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--list
}
# 查看消费者组详情
describe_consumer_group() {
local group_id=$1
echo "消费者组详情: $group_id"
$KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--group $group_id \
--describe
}
# 重置消费者组偏移量
reset_consumer_group_offset() {
local group_id=$1
local topic=$2
local reset_type=${3:-"earliest"}
echo "重置消费者组偏移量: $group_id (Topic:$topic, 重置到:$reset_type)"
$KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--group $group_id \
--topic $topic \
--reset-offsets \
--to-$reset_type \
--execute
}
# 删除消费者组
delete_consumer_group() {
local group_id=$1
echo "删除消费者组: $group_id"
$KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--group $group_id \
--delete
}
}
# 4. 性能测试
performance_test() {
echo "Kafka性能测试..."
# 生产者性能测试
producer_perf_test() {
local topic=$1
local num_records=${2:-100000}
local record_size=${3:-1024}
local throughput=${4:-10000}
echo "生产者性能测试: Topic=$topic, 消息数=$num_records, 消息大小=$record_size, 吞吐量=$throughput"
$KAFKA_HOME/bin/kafka-producer-perf-test.sh \
--topic $topic \
--num-records $num_records \
--record-size $record_size \
--throughput $throughput \
--producer-props bootstrap.servers=$BOOTSTRAP_SERVERS \
acks=1 \
compression.type=lz4 \
batch.size=65536 \
linger.ms=10
}
# 消费者性能测试
consumer_perf_test() {
local topic=$1
local messages=${2:-100000}
echo "消费者性能测试: Topic=$topic, 消息数=$messages"
$KAFKA_HOME/bin/kafka-consumer-perf-test.sh \
--topic $topic \
--messages $messages \
--bootstrap-server $BOOTSTRAP_SERVERS \
--consumer-props group.id=perf-test-group \
fetch.min.bytes=50000 \
fetch.max.wait.ms=500
}
}
# 5. 数据备份与恢复
backup_and_restore() {
echo "数据备份与恢复..."
# 备份Topic数据
backup_topic() {
local topic=$1
local backup_dir=$2
local from_beginning=${3:-true}
echo "备份Topic数据: $topic -> $backup_dir"
mkdir -p $backup_dir
if [ "$from_beginning" = "true" ]; then
offset_reset="--from-beginning"
else
offset_reset=""
fi
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $topic \
$offset_reset \
--property print.key=true \
--property key.separator=: \
--timeout-ms 30000 > $backup_dir/${topic}_backup_$(date +%Y%m%d_%H%M%S).txt
}
# 恢复Topic数据
restore_topic() {
local topic=$1
local backup_file=$2
echo "恢复Topic数据: $backup_file -> $topic"
$KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $topic \
--property parse.key=true \
--property key.separator=: < $backup_file
}
}
# 6. 日志清理
clean_logs() {
echo "清理Kafka日志..."
# 清理过期日志段
clean_expired_logs() {
local log_dir="/var/kafka-logs"
local retention_days=${1:-7}
echo "清理 $retention_days 天前的日志文件"
find $log_dir -name "*.log" -mtime +$retention_days -delete
find $log_dir -name "*.index" -mtime +$retention_days -delete
find $log_dir -name "*.timeindex" -mtime +$retention_days -delete
echo "日志清理完成"
}
# 清理孤立的索引文件
clean_orphaned_indexes() {
local log_dir="/var/kafka-logs"
echo "清理孤立的索引文件"
# 查找没有对应.log文件的.index文件
find $log_dir -name "*.index" | while read index_file; do
log_file="${index_file%.index}.log"
if [ ! -f "$log_file" ]; then
echo "删除孤立索引文件: $index_file"
rm -f "$index_file"
fi
done
}
}
# 7. 监控告警
monitoring_alerts() {
echo "监控告警检查..."
# 检查磁盘使用率
check_disk_usage() {
local log_dir="/var/kafka-logs"
local threshold=${1:-85}
disk_usage=$(df $log_dir | awk 'NR==2 {print $5}' | sed 's/%//')
if [ $disk_usage -gt $threshold ]; then
echo "⚠ 磁盘使用率过高: ${disk_usage}% (阈值: ${threshold}%)"
send_alert "Kafka磁盘使用率告警" "当前使用率: ${disk_usage}%"
else
echo "✓ 磁盘使用率正常: ${disk_usage}%"
fi
}
# 检查消费滞后
check_consumer_lag() {
local group_id=$1
local lag_threshold=${2:-10000}
lag_info=$($KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--group $group_id \
--describe | awk 'NR>1 {sum+=$5} END {print sum}')
if [ "$lag_info" != "" ] && [ $lag_info -gt $lag_threshold ]; then
echo "⚠ 消费滞后过高: $lag_info (阈值: $lag_threshold)"
send_alert "Kafka消费滞后告警" "消费者组: $group_id, 滞后: $lag_info"
else
echo "✓ 消费滞后正常: $lag_info"
fi
}
# 发送告警
send_alert() {
local title=$1
local message=$2
# 这里可以集成各种告警方式:邮件、钉钉、企业微信等
echo "[ALERT] $title: $message"
# 示例:发送到日志文件
echo "$(date): $title - $message" >> /var/log/kafka-alerts.log
}
}
# 主函数
main() {
case $1 in
"health")
check_cluster_health
;;
"topics")
manage_topics
;;
"groups")
manage_consumer_groups
;;
"perf")
performance_test
;;
"backup")
backup_and_restore
;;
"clean")
clean_logs
;;
"monitor")
monitoring_alerts
;;
*)
echo "用法: $0 {health|topics|groups|perf|backup|clean|monitor}"
exit 1
;;
esac
}
# 执行主函数
main $@
最佳实践
1. 架构设计
- 合理的分区策略:根据业务特点和消费者数量设计分区数
- 副本配置:生产环境建议至少3个副本,确保高可用
- Topic命名规范:使用有意义的命名,便于管理和监控
- 消息格式设计:使用Avro、Protobuf等序列化格式
2. 性能优化
- 批处理优化:合理设置batch.size和linger.ms
- 压缩配置:根据网络和CPU情况选择压缩算法
- 内存配置:合理分配JVM堆内存和页缓存
- 网络优化:调整网络缓冲区大小
3. 运维管理
- 监控体系:建立完善的监控和告警机制
- 容量规划:根据业务增长预估容量需求
- 备份策略:定期备份重要数据和配置
- 升级策略:制定滚动升级方案
4. 安全配置
- 认证授权:配置SASL/SSL认证
- 网络隔离:使用防火墙和VPC隔离
- 数据加密:启用传输和存储加密
- 审计日志:记录关键操作日志
总结
Apache Kafka作为现代数据架构的核心组件,在大数据处理、实时流计算、微服务通信等场景中发挥着重要作用。通过合理的架构设计、性能优化和运维管理,可以构建高可用、高性能的消息系统,为业务提供可靠的数据传输保障。
掌握Kafka的核心概念、API使用、性能调优和运维技巧,是大数据工程师和架构师的必备技能。随着云原生技术的发展,Kafka也在不断演进,支持更多的部署模式和集成方案。