Skip to content

RocketMQ 详细介绍

RocketMQ 是什么

RocketMQ 是阿里巴巴开源的分布式消息中间件,是一个高性能、高可靠、高实时、分布式的消息中间件,专为大规模分布式系统设计。最初由阿里巴巴开发并在内部大规模使用,后捐献给 Apache 基金会,现已成为 Apache 顶级项目。

主要应用场景包括

  1. 应用解耦:微服务之间的异步通信
  2. 流量削峰:处理突发流量,保护下游系统
  3. 数据分发:一对多的数据广播和分发
  4. 事务消息:分布式事务场景的最终一致性保障
  5. 顺序消息:严格按顺序处理的业务场景
  6. 定时/延时消息:定时任务和延迟处理

设计目标

  1. 高性能:单机支持万级并发,集群支持千万级TPS
  2. 高可用:99.95%的高可用性,支持跨机房部署
  3. 高可靠:消息零丢失,支持事务消息
  4. 低延迟:毫秒级消息延迟
  5. 可扩展:支持水平扩展,万亿级消息堆积能力
  6. 易运维:提供丰富的监控和管理工具

架构设计

                    +-------------------+
                    |   Name Server     |
                    |   (路由注册中心)     |
                    +-------------------+
                            |
                    +-------------------+
                    |   RocketMQ Cluster|
                    +-------------------+
                         |         |
                    +--------+  +--------+
                    |Broker 1|  |Broker 2|  ... Broker N
                    | Master |  | Slave  |
                    +--------+  +--------+
                        |             |
                -----------------------------
                |                           |
         +-------------+           +---------------+
         |   Producer  |           |   Consumer     |
         +-------------+           +---------------+

组件说明:

组件作用
Producer生产者,负责发送消息到 RocketMQ
Consumer消费者,从 RocketMQ 读取消息
Topic主题,消息的分类容器(逻辑划分)
Queue队列,Topic 的物理分片,类似 Kafka 的 Partition
Broker消息存储服务器,负责消息的存储和转发
Name Server路由注册中心,管理 Broker 路由信息,轻量级服务发现
Message消息,包含消息体、属性、标签等信息

组件功能详细介绍

1. Name Server(路由注册中心)

基本定义 Name Server 是 RocketMQ 的路由注册中心,提供轻量级的服务发现和路由管理功能。它是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步。

主要功能

  1. Broker 管理:接受 Broker 集群的注册信息并提供心跳检测机制
  2. 路由信息管理:管理 Topic 的路由信息,告诉客户端去哪个 Broker 上读写消息
  3. 客户端路由:为 Producer 和 Consumer 提供路由信息

与 Kafka ZooKeeper 的区别

  • Name Server 设计更加轻量级,无状态
  • 不需要选举,每个节点都是对等的
  • 部署和运维更简单
  • 不存储消息数据,只存储路由元数据
2. Broker(消息存储服务器)

基本定义 Broker 是 RocketMQ 的核心组件,负责消息的存储、转发、查询以及服务高可用保证。

主要功能

  1. 消息存储:将消息持久化到磁盘
  2. 消息转发:将消息转发给消费者
  3. 消息查询:支持按消息 ID、Key 查询消息
  4. 高可用保证:通过主从复制保证服务高可用

Broker 集群模式

模式描述优缺点
单 Master只有一个 Master 节点简单,但存在单点故障风险
多 Master多个 Master 节点,无 Slave配置简单,性能最高,但机器宕机期间未消费消息不可订阅
多 Master 多 Slave(异步复制)每个 Master 配置一个 Slave主备有短暂消息延迟,Master 宕机时有极少量消息丢失
多 Master 多 Slave(同步双写)每个 Master 配置一个 Slave,同步双写数据与服务都无单点,性能比异步复制略低
3. Topic(主题)

基本定义 Topic 是 RocketMQ 中消息的第一级分类,是消息的逻辑容器。Producer 发送消息时需要指定 Topic,Consumer 订阅消息时也需要指定 Topic。

Topic 的特性

  1. 逻辑隔离:不同 Topic 的消息完全隔离
  2. 多队列支持:一个 Topic 可以包含多个 Queue
  3. 标签过滤:支持通过 Tag 进行消息过滤
  4. 权限控制:可以对 Topic 设置读写权限

Topic 常用命令

bash
# 创建 Topic
sh mqadmin updateTopic -c DefaultCluster -t TestTopic -n 127.0.0.1:9876

# 查看 Topic 列表
sh mqadmin topicList -n 127.0.0.1:9876

# 查看 Topic 详情
sh mqadmin topicRoute -t TestTopic -n 127.0.0.1:9876

# 删除 Topic
sh mqadmin deleteTopic -c DefaultCluster -t TestTopic -n 127.0.0.1:9876

# 更新 Topic 配置
sh mqadmin updateTopic -c DefaultCluster -t TestTopic -r 8 -w 8 -n 127.0.0.1:9876
4. Queue(队列)

基本定义 Queue 是 Topic 的物理分片,类似于 Kafka 的 Partition。一个 Topic 可以包含多个 Queue,Queue 是消息存储和消费的最小单位。

Queue 的特性

  1. 顺序保证:单个 Queue 内的消息严格有序
  2. 负载均衡:多个 Queue 可以分布在不同的 Broker 上
  3. 并发消费:多个 Consumer 可以并发消费不同的 Queue
  4. 读写分离:支持读写队列数量不同的配置

Queue 数量设计建议

场景建议 Queue 数量
小型应用4-8 个
中型应用8-16 个
大型应用16-32 个
超大型应用32+ 个(需要测试验证)
5. Message(消息)

消息结构

java
public class Message {
    private String topic;        // 主题
    private String tags;         // 标签,用于消息过滤
    private String keys;         // 消息索引键,便于查找
    private byte[] body;         // 消息体
    private Map<String, String> properties; // 用户自定义属性
}

消息类型

  1. 普通消息:最基本的消息类型
  2. 顺序消息:保证消息严格按顺序消费
  3. 事务消息:支持分布式事务的消息
  4. 定时/延时消息:支持定时投递的消息
  5. 批量消息:支持批量发送以提高性能

RocketMQ 核心特性

1. 事务消息

RocketMQ 支持分布式事务消息,保证本地事务和消息发送的最终一致性。

事务消息流程

1. Producer 发送半事务消息(Half Message)
2. Broker 存储半事务消息,对 Consumer 不可见
3. Producer 执行本地事务
4. Producer 根据本地事务结果提交或回滚消息
5. 如果 Producer 宕机,Broker 会回查事务状态

使用场景

  • 订单系统:下单成功后发送消息通知库存系统
  • 支付系统:支付成功后发送消息通知订单系统
  • 账户系统:转账成功后发送消息通知风控系统

2. 顺序消息

RocketMQ 支持两种顺序消息:

全局顺序消息

  • 整个 Topic 只有一个 Queue
  • 所有消息严格按照发送顺序消费
  • 性能较低,一般不推荐

分区顺序消息

  • 同一个 MessageQueue 内的消息严格有序
  • 不同 MessageQueue 之间的消息可以并发消费
  • 性能较高,推荐使用

3. 定时/延时消息

RocketMQ 支持定时和延时消息投递:

延时级别

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

使用场景

  • 订单超时取消
  • 定时提醒
  • 延时重试

4. 消息过滤

RocketMQ 支持多种消息过滤方式:

Tag 过滤

java
// 发送消息时设置 Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 消费时过滤 Tag
consumer.subscribe("TopicTest", "TagA || TagB");

SQL92 过滤

java
// 发送消息时设置属性
msg.putUserProperty("age", "18");
msg.putUserProperty("name", "张三");

// 消费时使用 SQL 过滤
consumer.subscribe("TopicTest", MessageSelector.bySql("age > 16 AND name = '张三'"));

RocketMQ 最佳实践

1. 生产者最佳实践

java
// 1. 设置生产者组
producer.setProducerGroup("producer_group_name");

// 2. 设置发送超时时间
producer.setSendMsgTimeout(3000);

// 3. 设置重试次数
producer.setRetryTimesWhenSendFailed(2);

// 4. 批量发送消息
List<Message> messages = new ArrayList<>();
// ... 添加消息
producer.send(messages);

// 5. 异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 发送成功处理
    }
    
    @Override
    public void onException(Throwable e) {
        // 发送失败处理
    }
});

2. 消费者最佳实践

java
// 1. 设置消费者组
consumer.setConsumerGroup("consumer_group_name");

// 2. 设置消费模式
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式
// consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式

// 3. 设置消费位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 4. 设置并发消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 5. 设置批量消费
consumer.setConsumeMessageBatchMaxSize(10);

// 6. 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> messages,
            ConsumeConcurrentlyContext context) {
        for (MessageExt message : messages) {
            // 处理消息
            System.out.println(new String(message.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

监控和运维

1. 重要监控指标

Broker 监控

  • TPS(每秒事务数)
  • QPS(每秒查询数)
  • 消息堆积量
  • 磁盘使用率
  • 内存使用率
  • 网络 IO

Topic 监控

  • 生产 TPS
  • 消费 TPS
  • 消息堆积量
  • 消费延迟

Consumer 监控

  • 消费 TPS
  • 消费延迟
  • 消费失败率
  • 重试次数

2. 常用运维命令

bash
# 查看集群状态
sh mqadmin clusterList -n 127.0.0.1:9876

# 查看 Broker 状态
sh mqadmin brokerStatus -b 127.0.0.1:10911 -n 127.0.0.1:9876

# 查看消费者状态
sh mqadmin consumerStatus -g consumer_group -n 127.0.0.1:9876

# 查看消息堆积情况
sh mqadmin consumerProgress -g consumer_group -n 127.0.0.1:9876

# 重置消费位点
sh mqadmin resetOffset -t TopicTest -g consumer_group -n 127.0.0.1:9876

# 查询消息
sh mqadmin queryMsgById -i msgId -n 127.0.0.1:9876

RocketMQ vs Kafka 场景比较

技术特性对比

特性RocketMQKafka
开发语言JavaScala/Java
协调服务Name Server(轻量级)ZooKeeper/KRaft(重量级)
消息顺序支持全局和分区顺序仅支持分区顺序
事务消息✅ 原生支持❌ 不支持(需要额外实现)
定时/延时消息✅ 原生支持❌ 不支持
消息过滤✅ 支持 Tag 和 SQL 过滤❌ 客户端过滤
消息回溯✅ 支持按时间回溯✅ 支持按 offset 回溯
消息重试✅ 原生支持死信队列❌ 需要自己实现
运维工具✅ 丰富的控制台和命令行工具✅ 社区工具丰富
性能单机 10 万 TPS单机 100 万 TPS
延迟毫秒级毫秒级
可靠性99.95%99.9%

适用场景对比

RocketMQ 更适合的场景

1. 金融支付系统

场景:支付订单处理
原因:
- 需要事务消息保证数据一致性
- 需要严格的消息顺序
- 对消息可靠性要求极高
- 需要消息重试和死信队列

2. 电商订单系统

场景:订单状态流转
原因:
- 需要定时消息(订单超时取消)
- 需要事务消息(下单和扣库存)
- 需要消息过滤(不同状态的订单)
- 需要顺序消息(订单状态变更)

3. 物联网(IoT)系统

场景:设备数据采集和控制
原因:
- 需要定时消息(定时采集)
- 需要消息过滤(按设备类型过滤)
- 需要高可靠性(设备控制指令不能丢失)

4. 微服务架构

场景:服务间异步通信
原因:
- 需要事务消息保证业务一致性
- 需要消息重试机制
- 运维工具丰富,便于管理

Kafka 更适合的场景

1. 大数据实时处理

场景:日志收集和实时分析
原因:
- 超高吞吐量(百万级 TPS)
- 与大数据生态集成好(Spark、Flink、Storm)
- 支持流式处理
- 数据回溯能力强

2. 用户行为分析

场景:用户点击流分析
原因:
- 数据量大,需要高吞吐量
- 对消息丢失容忍度较高
- 需要长期存储历史数据
- 多个系统需要消费同一份数据

3. 监控和指标收集

场景:系统监控数据收集
原因:
- 数据量大,需要高性能
- 对实时性要求高
- 需要数据持久化
- 与监控系统集成好

4. 事件驱动架构

场景:领域事件处理
原因:
- 高性能事件处理
- 支持事件回放
- 社区生态丰富
- 与流处理框架集成好

选型建议

选择 RocketMQ 的情况

业务场景需要

  • 事务消息
  • 定时/延时消息
  • 严格的消息顺序
  • 消息过滤
  • 高可靠性(金融级)

技术要求

  • Java 技术栈
  • 需要丰富的运维工具
  • 对性能要求不是极致(10万级 TPS 足够)
  • 需要简单的部署和运维

团队情况

  • Java 开发团队
  • 对消息中间件运维经验有限
  • 需要快速上手

选择 Kafka 的情况

业务场景需要

  • 超高吞吐量(百万级 TPS)
  • 大数据实时处理
  • 日志收集和分析
  • 流式数据处理
  • 长期数据存储

技术要求

  • 与大数据生态集成
  • 需要流处理能力
  • 对延迟要求不是极致
  • 可以接受复杂的运维

团队情况

  • 有大数据处理经验
  • 有 Kafka 运维经验
  • 技术实力较强

混合使用场景

在大型系统中,RocketMQ 和 Kafka 可以同时使用,各自发挥优势:

系统架构示例:

业务系统 → RocketMQ → 业务处理

数据同步

Kafka → 大数据分析平台

说明:
- RocketMQ 处理业务消息(事务、顺序、定时)
- Kafka 处理分析数据(日志、指标、事件)

总结

RocketMQ 和 Kafka 都是优秀的消息中间件,选择哪个主要取决于具体的业务场景和技术要求:

  • RocketMQ:更适合业务系统,特别是对消息可靠性、事务性、顺序性有严格要求的场景
  • Kafka:更适合大数据场景,特别是需要超高吞吐量和与大数据生态集成的场景

在实际项目中,建议根据具体需求进行技术选型,也可以考虑在不同场景下混合使用两种消息中间件。