Appearance
RocketMQ 详细介绍
RocketMQ 是什么
RocketMQ 是阿里巴巴开源的分布式消息中间件,是一个高性能、高可靠、高实时、分布式的消息中间件,专为大规模分布式系统设计。最初由阿里巴巴开发并在内部大规模使用,后捐献给 Apache 基金会,现已成为 Apache 顶级项目。
主要应用场景包括
- 应用解耦:微服务之间的异步通信
- 流量削峰:处理突发流量,保护下游系统
- 数据分发:一对多的数据广播和分发
- 事务消息:分布式事务场景的最终一致性保障
- 顺序消息:严格按顺序处理的业务场景
- 定时/延时消息:定时任务和延迟处理
设计目标
- 高性能:单机支持万级并发,集群支持千万级TPS
- 高可用:99.95%的高可用性,支持跨机房部署
- 高可靠:消息零丢失,支持事务消息
- 低延迟:毫秒级消息延迟
- 可扩展:支持水平扩展,万亿级消息堆积能力
- 易运维:提供丰富的监控和管理工具
架构设计
+-------------------+
| Name Server |
| (路由注册中心) |
+-------------------+
|
+-------------------+
| RocketMQ Cluster|
+-------------------+
| |
+--------+ +--------+
|Broker 1| |Broker 2| ... Broker N
| Master | | Slave |
+--------+ +--------+
| |
-----------------------------
| |
+-------------+ +---------------+
| Producer | | Consumer |
+-------------+ +---------------+
组件说明:
组件 | 作用 |
---|---|
Producer | 生产者,负责发送消息到 RocketMQ |
Consumer | 消费者,从 RocketMQ 读取消息 |
Topic | 主题,消息的分类容器(逻辑划分) |
Queue | 队列,Topic 的物理分片,类似 Kafka 的 Partition |
Broker | 消息存储服务器,负责消息的存储和转发 |
Name Server | 路由注册中心,管理 Broker 路由信息,轻量级服务发现 |
Message | 消息,包含消息体、属性、标签等信息 |
组件功能详细介绍
1. Name Server(路由注册中心)
基本定义 Name Server 是 RocketMQ 的路由注册中心,提供轻量级的服务发现和路由管理功能。它是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步。
主要功能
- Broker 管理:接受 Broker 集群的注册信息并提供心跳检测机制
- 路由信息管理:管理 Topic 的路由信息,告诉客户端去哪个 Broker 上读写消息
- 客户端路由:为 Producer 和 Consumer 提供路由信息
与 Kafka ZooKeeper 的区别
- Name Server 设计更加轻量级,无状态
- 不需要选举,每个节点都是对等的
- 部署和运维更简单
- 不存储消息数据,只存储路由元数据
2. Broker(消息存储服务器)
基本定义 Broker 是 RocketMQ 的核心组件,负责消息的存储、转发、查询以及服务高可用保证。
主要功能
- 消息存储:将消息持久化到磁盘
- 消息转发:将消息转发给消费者
- 消息查询:支持按消息 ID、Key 查询消息
- 高可用保证:通过主从复制保证服务高可用
Broker 集群模式
模式 | 描述 | 优缺点 |
---|---|---|
单 Master | 只有一个 Master 节点 | 简单,但存在单点故障风险 |
多 Master | 多个 Master 节点,无 Slave | 配置简单,性能最高,但机器宕机期间未消费消息不可订阅 |
多 Master 多 Slave(异步复制) | 每个 Master 配置一个 Slave | 主备有短暂消息延迟,Master 宕机时有极少量消息丢失 |
多 Master 多 Slave(同步双写) | 每个 Master 配置一个 Slave,同步双写 | 数据与服务都无单点,性能比异步复制略低 |
3. Topic(主题)
基本定义 Topic 是 RocketMQ 中消息的第一级分类,是消息的逻辑容器。Producer 发送消息时需要指定 Topic,Consumer 订阅消息时也需要指定 Topic。
Topic 的特性
- 逻辑隔离:不同 Topic 的消息完全隔离
- 多队列支持:一个 Topic 可以包含多个 Queue
- 标签过滤:支持通过 Tag 进行消息过滤
- 权限控制:可以对 Topic 设置读写权限
Topic 常用命令
bash
# 创建 Topic
sh mqadmin updateTopic -c DefaultCluster -t TestTopic -n 127.0.0.1:9876
# 查看 Topic 列表
sh mqadmin topicList -n 127.0.0.1:9876
# 查看 Topic 详情
sh mqadmin topicRoute -t TestTopic -n 127.0.0.1:9876
# 删除 Topic
sh mqadmin deleteTopic -c DefaultCluster -t TestTopic -n 127.0.0.1:9876
# 更新 Topic 配置
sh mqadmin updateTopic -c DefaultCluster -t TestTopic -r 8 -w 8 -n 127.0.0.1:9876
4. Queue(队列)
基本定义 Queue 是 Topic 的物理分片,类似于 Kafka 的 Partition。一个 Topic 可以包含多个 Queue,Queue 是消息存储和消费的最小单位。
Queue 的特性
- 顺序保证:单个 Queue 内的消息严格有序
- 负载均衡:多个 Queue 可以分布在不同的 Broker 上
- 并发消费:多个 Consumer 可以并发消费不同的 Queue
- 读写分离:支持读写队列数量不同的配置
Queue 数量设计建议
场景 | 建议 Queue 数量 |
---|---|
小型应用 | 4-8 个 |
中型应用 | 8-16 个 |
大型应用 | 16-32 个 |
超大型应用 | 32+ 个(需要测试验证) |
5. Message(消息)
消息结构
java
public class Message {
private String topic; // 主题
private String tags; // 标签,用于消息过滤
private String keys; // 消息索引键,便于查找
private byte[] body; // 消息体
private Map<String, String> properties; // 用户自定义属性
}
消息类型
- 普通消息:最基本的消息类型
- 顺序消息:保证消息严格按顺序消费
- 事务消息:支持分布式事务的消息
- 定时/延时消息:支持定时投递的消息
- 批量消息:支持批量发送以提高性能
RocketMQ 核心特性
1. 事务消息
RocketMQ 支持分布式事务消息,保证本地事务和消息发送的最终一致性。
事务消息流程
1. Producer 发送半事务消息(Half Message)
2. Broker 存储半事务消息,对 Consumer 不可见
3. Producer 执行本地事务
4. Producer 根据本地事务结果提交或回滚消息
5. 如果 Producer 宕机,Broker 会回查事务状态
使用场景
- 订单系统:下单成功后发送消息通知库存系统
- 支付系统:支付成功后发送消息通知订单系统
- 账户系统:转账成功后发送消息通知风控系统
2. 顺序消息
RocketMQ 支持两种顺序消息:
全局顺序消息
- 整个 Topic 只有一个 Queue
- 所有消息严格按照发送顺序消费
- 性能较低,一般不推荐
分区顺序消息
- 同一个 MessageQueue 内的消息严格有序
- 不同 MessageQueue 之间的消息可以并发消费
- 性能较高,推荐使用
3. 定时/延时消息
RocketMQ 支持定时和延时消息投递:
延时级别
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
使用场景
- 订单超时取消
- 定时提醒
- 延时重试
4. 消息过滤
RocketMQ 支持多种消息过滤方式:
Tag 过滤
java
// 发送消息时设置 Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 消费时过滤 Tag
consumer.subscribe("TopicTest", "TagA || TagB");
SQL92 过滤
java
// 发送消息时设置属性
msg.putUserProperty("age", "18");
msg.putUserProperty("name", "张三");
// 消费时使用 SQL 过滤
consumer.subscribe("TopicTest", MessageSelector.bySql("age > 16 AND name = '张三'"));
RocketMQ 最佳实践
1. 生产者最佳实践
java
// 1. 设置生产者组
producer.setProducerGroup("producer_group_name");
// 2. 设置发送超时时间
producer.setSendMsgTimeout(3000);
// 3. 设置重试次数
producer.setRetryTimesWhenSendFailed(2);
// 4. 批量发送消息
List<Message> messages = new ArrayList<>();
// ... 添加消息
producer.send(messages);
// 5. 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功处理
}
@Override
public void onException(Throwable e) {
// 发送失败处理
}
});
2. 消费者最佳实践
java
// 1. 设置消费者组
consumer.setConsumerGroup("consumer_group_name");
// 2. 设置消费模式
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式
// consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式
// 3. 设置消费位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 4. 设置并发消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 5. 设置批量消费
consumer.setConsumeMessageBatchMaxSize(10);
// 6. 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 处理消息
System.out.println(new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
监控和运维
1. 重要监控指标
Broker 监控
- TPS(每秒事务数)
- QPS(每秒查询数)
- 消息堆积量
- 磁盘使用率
- 内存使用率
- 网络 IO
Topic 监控
- 生产 TPS
- 消费 TPS
- 消息堆积量
- 消费延迟
Consumer 监控
- 消费 TPS
- 消费延迟
- 消费失败率
- 重试次数
2. 常用运维命令
bash
# 查看集群状态
sh mqadmin clusterList -n 127.0.0.1:9876
# 查看 Broker 状态
sh mqadmin brokerStatus -b 127.0.0.1:10911 -n 127.0.0.1:9876
# 查看消费者状态
sh mqadmin consumerStatus -g consumer_group -n 127.0.0.1:9876
# 查看消息堆积情况
sh mqadmin consumerProgress -g consumer_group -n 127.0.0.1:9876
# 重置消费位点
sh mqadmin resetOffset -t TopicTest -g consumer_group -n 127.0.0.1:9876
# 查询消息
sh mqadmin queryMsgById -i msgId -n 127.0.0.1:9876
RocketMQ vs Kafka 场景比较
技术特性对比
特性 | RocketMQ | Kafka |
---|---|---|
开发语言 | Java | Scala/Java |
协调服务 | Name Server(轻量级) | ZooKeeper/KRaft(重量级) |
消息顺序 | 支持全局和分区顺序 | 仅支持分区顺序 |
事务消息 | ✅ 原生支持 | ❌ 不支持(需要额外实现) |
定时/延时消息 | ✅ 原生支持 | ❌ 不支持 |
消息过滤 | ✅ 支持 Tag 和 SQL 过滤 | ❌ 客户端过滤 |
消息回溯 | ✅ 支持按时间回溯 | ✅ 支持按 offset 回溯 |
消息重试 | ✅ 原生支持死信队列 | ❌ 需要自己实现 |
运维工具 | ✅ 丰富的控制台和命令行工具 | ✅ 社区工具丰富 |
性能 | 单机 10 万 TPS | 单机 100 万 TPS |
延迟 | 毫秒级 | 毫秒级 |
可靠性 | 99.95% | 99.9% |
适用场景对比
RocketMQ 更适合的场景
1. 金融支付系统
场景:支付订单处理
原因:
- 需要事务消息保证数据一致性
- 需要严格的消息顺序
- 对消息可靠性要求极高
- 需要消息重试和死信队列
2. 电商订单系统
场景:订单状态流转
原因:
- 需要定时消息(订单超时取消)
- 需要事务消息(下单和扣库存)
- 需要消息过滤(不同状态的订单)
- 需要顺序消息(订单状态变更)
3. 物联网(IoT)系统
场景:设备数据采集和控制
原因:
- 需要定时消息(定时采集)
- 需要消息过滤(按设备类型过滤)
- 需要高可靠性(设备控制指令不能丢失)
4. 微服务架构
场景:服务间异步通信
原因:
- 需要事务消息保证业务一致性
- 需要消息重试机制
- 运维工具丰富,便于管理
Kafka 更适合的场景
1. 大数据实时处理
场景:日志收集和实时分析
原因:
- 超高吞吐量(百万级 TPS)
- 与大数据生态集成好(Spark、Flink、Storm)
- 支持流式处理
- 数据回溯能力强
2. 用户行为分析
场景:用户点击流分析
原因:
- 数据量大,需要高吞吐量
- 对消息丢失容忍度较高
- 需要长期存储历史数据
- 多个系统需要消费同一份数据
3. 监控和指标收集
场景:系统监控数据收集
原因:
- 数据量大,需要高性能
- 对实时性要求高
- 需要数据持久化
- 与监控系统集成好
4. 事件驱动架构
场景:领域事件处理
原因:
- 高性能事件处理
- 支持事件回放
- 社区生态丰富
- 与流处理框架集成好
选型建议
选择 RocketMQ 的情况
✅ 业务场景需要
- 事务消息
- 定时/延时消息
- 严格的消息顺序
- 消息过滤
- 高可靠性(金融级)
✅ 技术要求
- Java 技术栈
- 需要丰富的运维工具
- 对性能要求不是极致(10万级 TPS 足够)
- 需要简单的部署和运维
✅ 团队情况
- Java 开发团队
- 对消息中间件运维经验有限
- 需要快速上手
选择 Kafka 的情况
✅ 业务场景需要
- 超高吞吐量(百万级 TPS)
- 大数据实时处理
- 日志收集和分析
- 流式数据处理
- 长期数据存储
✅ 技术要求
- 与大数据生态集成
- 需要流处理能力
- 对延迟要求不是极致
- 可以接受复杂的运维
✅ 团队情况
- 有大数据处理经验
- 有 Kafka 运维经验
- 技术实力较强
混合使用场景
在大型系统中,RocketMQ 和 Kafka 可以同时使用,各自发挥优势:
系统架构示例:
业务系统 → RocketMQ → 业务处理
↓
数据同步
↓
Kafka → 大数据分析平台
说明:
- RocketMQ 处理业务消息(事务、顺序、定时)
- Kafka 处理分析数据(日志、指标、事件)
总结
RocketMQ 和 Kafka 都是优秀的消息中间件,选择哪个主要取决于具体的业务场景和技术要求:
- RocketMQ:更适合业务系统,特别是对消息可靠性、事务性、顺序性有严格要求的场景
- Kafka:更适合大数据场景,特别是需要超高吞吐量和与大数据生态集成的场景
在实际项目中,建议根据具体需求进行技术选型,也可以考虑在不同场景下混合使用两种消息中间件。