# RocketMQ4.x
# RocketMQ4.x介绍
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。RocketMQ遵从(参考了)JMS规范(Java Message Service)实现(ActiveMQ也基于JMS规范实现)。
# RocketMQ的基本概念
# 生产者
生产消息,无需过多介绍。
# 消费者
消费消息,无需过多介绍。
# Broker
Broker的概念与RabbitMQ一样,Broker就是消息服务器。
# Topic
主题,类似RabbitMQ的交换机Exchange,生产者需要通过Topic发送消息到MessageQueue。
# MessageQueue
消息队列,存储消息,供消费者消费。
# NameServer
生产者和消费者都要通过NameServer的服务发现功能找到Broker,再通过Broker中的MessageQueue收发消息,可以看到NamerServer基本等价于Nacos,充当着服务发现者的角色。RocketMQ中NameServer集群中NameServer与NameServer之间相互独立,没有任何通信行为。
# RocketMQ的工作模型
# RocketMQ的消息模型

# RocketMQ的部署模型

# 单机模式下RocketMQ部署
# NameServer部署
1.在环境变量里设置rocketmq的安装配置并保存
# 修改环境变量文件
vim /etc/profile
# 增加如下行后:wq保存
export ROCKETMQ_HOME=/path/to/rocketmq-all-*
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 使环境变量立刻生效
source /etc/profile
2.进入rocketmq的bin目录,执行如下指令启动namesrv服务
# 静默启动
nohup ./mqnamesrv &
# Broker部署
1.在rocketmq的安装目录下,修改配置文件
# 修改环境变量文件,修改后使用:wq保存并退出
vim broker.conf
# 设置允许自动创建topic
autoCreateTopicEnable=true
# 设置namesrv的ip以及端口
namesrvAddr=127.0.0.1:9876
2.进入rocketmq的bin目录,执行如下指令启动broker服务
# 静默启动,使用-c参数指定配置文件
nohup ./mqbroker -c ../conf/broker.conf &
# RocketMQ Dashboard编译安装部署
由于项目中使用的rocketmq dashboard存在未授权访问问题,因此需要重新编译打包并部署rocketmq dashboard服务,故记录如下:
1.在官网的git仓库rocketmq dashboard源码地址 (opens new window)中拉取源码。
2.修改项目的application.properties文件,将其中的rocketmq.config.loginRequire配置修改为true。

3.修改项目的users.properties文件,在其中添加属性键值对key=value,key代表用户,value的值代表密码及权限,如这里的admin=asssAAA,1即代表管理员权限,xuhaodi=xxxxaawwa则为普通用户权限

4.打包并在测试环境使用如下指令启动服务即可
# 这里dashboard启动时指定了nameserver的ip地址以及端口
nohup java -Xms512m -Xmx2048m -jar -Dfile.encoding=UTF-8 -Drocketmq.namesrv.addr=127.0.0.1:9876 rockert-dashboard-1.0.0.jar >> /dev/null 2>&1 &
5.登录dashboard验证,弹出登录框提示鉴权,输入用户名密码后成功登录。


6.集群以及分片查看。

7.主题以及该主题消费者查看。


8.消费者组查看。

9.生产者组查看。

10.消息及消息详情查看。



# RocketMQ集群搭建的几种方式
RocketMQ集群搭建有三种方式,分别是同步主从,异步主从,多主模式。
# 同步主从模式
生产者发送消息,主节点收到消息后就会同步到从节点,至少有1个从节点完成该消息的同步后,主节点才会返回消息发送成功的状态码ACK给生产者。
# 异步主从模式
生产者发送消息,主节点收到消息后就会返回ACK给生产者。同时主节点异步将消息同步给从节点。
# 多主模式
多主模式中,每个主节点独立收发消息。
# 集群模式下RocketMQ部署
这里以192.168.0.1,192.168.0.2,192.168.0.3,192.168.0.4,192.168.0.5,192.168.0.6为例介绍如何部署RocketMQ集群。
# NameServer集群部署
在192.168.0.1,192.168.0.2,192.168.0.3上部署NameServer集群,RocketMQ中NameServer与NamerServer之间相互独立,互相之间不会进行通信,因此只需要在这3台机器上照单机模式下的NameServer部署即可。
1.在环境变量里设置rocketmq的安装配置并保存
# 修改环境变量文件
vim /etc/profile
# 增加如下行后:wq保存
export ROCKETMQ_HOME=/path/to/rocketmq-all-*
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 使环境变量立刻生效
source /etc/profile
2.进入rocketmq的bin目录,执行如下指令启动namesrv服务
# 静默启动
nohup ./mqnamesrv &
# Broker集群配置
这里以192.168.0.4,192.168.0.5为例介绍Broker集群的配置。Broker集群可选的方式为2m-2s-sync,2m-2s-async,2m-noslave。我们这里使用2m-2s-sync模式部署。其中我们会在192.168.0.4部署第一个主从的主机以及第二个主从的从机,然后在192.168.0.5部署第一个主从的从机以及第二个主从的主机,这样一个互为主从的集群就部署好了。
# 修改配置文件
1.192.168.0.4机器上的Master Broker配置文件
# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker分片的名称
brokerName=broker-a
# 集群中的主节点
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-a
2.12.192.168.0.5机器上的Master Broker配置文件
# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-b
# 集群中的主节点
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-b
3.12.192.168.0.4机器上的Slave Broker配置文件
# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-b
# 集群中的从节点
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-b
4.192.168.0.5机器上的Slave Broker配置文件
# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-a
# 集群中的从节点
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-a
# 启动服务
# 静默启动,使用-c参数指定配置文件
nohup ./bin/mqbroker -c ../conf/broker.conf &
# 启动DashBoard
在RocketMq集群环境下,我们应该也是只需要配置一个NameServer的地址即可访问Dashboard即可。
# 这里dashboard启动时指定了nameserver的ip地址以及端口,百度查阅的资料说是需要指定所有的nameServer,以;分隔
# 考虑到NameServer是无状态的,这里应该只配置一个即可。
# RocketMQ Dashboard 连接 NameServer 集群的目的是为了获取集群的状态信息,并且可以在集群中进行消息的生产和消费。如果你连接多个 NameServer 实例,主要有以下几个好处:
# 高可用性:如果其中一个 NameServer 宕机,RocketMQ Dashboard 仍然可以通过其他 NameServer 实例来保持与 RocketMQ 集群的连接。
# 负载均衡:RocketMQ Dashboard 会将请求均衡地分配到不同的 NameServer 实例上,以分散负载。
# 数据同步:多个 NameServer 实例之间会同步数据,确保它们的状态信息是一致的。
# 连接多个 NameServer 的配置通常在 RocketMQ Dashboard 的配置文件中设置,例如在 conf/broker.conf 文件中,你可以指定多个 NameServer 地址,它们之间使用分号(;)分隔。
nohup java -Xms512m -Xmx2048m -jar -Dfile.encoding=UTF-8 -Drocketmq.namesrv.addr=127.0.0.1:9876 rockert-dashboard-1.0.0.jar >> /dev/null 2>&1
# RocketMQ 5.X版本Proxy模块介绍
RocketMQ 5.0 时代,6 张图带你理解 Proxy (opens new window)
# RocketMQ中生产者发送消息的三种方式?
生产者发送消息有三种方式,分别是同步发送,异步发送,单向发送。需要特别强调的是,生产者发送消息的方式与消息类型无关,所有的消息类型(普通消息,顺序消息,延迟消息,事务消息)均支持同步发送,异步发送,单向发送。
# 同步发送
获取发送结果后再执行后续流程。
# 异步发送
发送完成后异步回调。
# 单向发送
仅发送,不接收broker的消息发送结果。
# RocketMQ中消费者接收消息的两种方式?
# Pull消费
在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。
# Push消费
消息推送模式,在推送模式中,默认为集群模式,即消息队列会将消息随机发到消费者集群中的任意一个服务。
# Broker推送消息给消费者的两种方式
# 集群推送模式
集群模式,即消息队列会将消息随机发到消费者集群中的任意一个服务。
# 广播推送模式
广播模式,即消息队列会将消息发送到所有订阅的消费者中。
# Broker推送消息到MessageQueue的方式
默认情况下,Broker会把消息推送给Topic下任意一个MessageQueue,如果我们觉得这种场景不满足我们的要求,我们可以指定消息要推送的MessageQueue(顺序消息生产),同样的,Broker也会随机推Topic下任何MessageQueue的消息给消费者(顺序消息消费),如果我们觉得这种场景不满足我们的使用,我们也可以指定顺序消费。
# RocketMQ中的功能特性
# 普通消息(并发消息)
# 普通消息生产
RocketMQ中的普通消息指的是无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。普通消息会经由Broker随机的给到Topic下的queue。对于消息发送,仅在普通消息发送代码演示同步发送,异步发送,单向发送特性。
- 同步发送
MQ中的默认生产者,生产者请求Broker,生产数据。可以使用同步发送或异步发送去发送消息。
public class SyncProducer {
public static void main(String[] args) throws Exception {
/**
* 定义生产者的同时设置唯一的组名
* 在 Apache RocketMQ 中,DefaultMQProducer 类是用于发送消息的默认生产者类。这个类需要一个唯一的 groupname,这个 groupname 是指定生产者的分组名称,用于标识同一分组内的生产者。通过为 DefaultMQProducer 设置一个 groupname,可以确保在 RocketMQ 中的唯一性,这对于管理和识别生产者实例非常重要。
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876"); //(2)
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
/**
* 根据Topic及Tag创建消息
* 消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
); //(3)
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg); //(4)
System.out.printf("%s%n", sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
- 异步发送
public class AsyncProducer {
public static void main(String[] args) throws Exception {
/**
* 定义生产者的同时设置唯一的组名
* 在 Apache RocketMQ 中,DefaultMQProducer 类是用于发送消息的默认生产者类。这个类需要一个唯一的 groupname,这个 groupname 是指定生产者的分组名称,用于标识同一分组内的生产者。通过为 DefaultMQProducer 设置一个 groupname,可以确保在 RocketMQ 中的唯一性,这对于管理和识别生产者实例非常重要。
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
/**
* 根据Topic及Tag创建消息
* 消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
*/
Message msg = new Message("TopicTest",
"TagA",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
countDownLatch.countDown();
}
});
} catch (Exception e) {
e.printStackTrace();
countDownLatch.countDown();
}
}
//异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
countDownLatch.await(5, TimeUnit.SECONDS);
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
- 单向发送
/**
* 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
# 普通消息消费-拉模式
在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。
- Pull Consumer
/**
* 拉模式,默认是集群(区别于广播模式)+并发(区别于顺序)消费
*/
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("TopicTest");
mq.setBrokerName("jinrongtong-MacBook-Pro.local");
long offset = 26;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
System.out.printf("%s%n", pullResult.getMsgFoundList());
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
- Lite Pull Consumer-Subscribe模式
/**
* 随机获取queue消息
*/
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
- Lite Pull Consumer-Assign模式
/**
* 获取指定queue的消息
*/
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
# 普通消息消费-推模式
消息推模式,采用这种模式的消费方式,消息消费由Broker控制,当Broker收到消息后,就会请求Consumer,Consumer会消费消息。同时在Consumer侧可以配置流量控制参数(防止推送消息过多,超过Consumer的消费能力)。普通消息的消费默认是采用集群消费。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
// 这里有两个选项可选,分别是并发以及顺序消费,顺序消费需要配合顺序生产者,并发消费需要配合普通生产者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
# 顺序消息
顺序消息是一种对消息发送和消费顺序有严格要求的消息。这里的顺序消息指的是局部的顺序消息,使用这种类型的消息,消息写入某个Topic下指定的queue是有序的。多个queue之间消息是无序的(类似jvm中的线程栈的概念)。
# 顺序消息生产
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
# 顺序消息消费
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
# 延迟消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。生产者可以在发送消息的时候设置延时时间,Broker收到消息后,会在延时时间结束后投递消息。延时消息可以是顺序消息或普通消息,但是不能是事务消息。
# 延迟消息生产
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
# 延迟消息消费
一条消息是否是延迟消息,对消费者是无感的。因此,延迟消息的消费与普通消息/顺序消息完全一致。
# 批量消息
# 批量消息生产
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
# 批量消息消费
一批消息是否是批量发送消息,对消费者是无感的。因此,批量消息的消费与普通消息/顺序消息完全一致。在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
# 事务消息
在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。事务消息是一种更强的同步发送消息,这种发送方式要求Broker及生产者对消息和事务进行确认及回滚等操作。
事务消息发送 (opens new window)
# 事务消息生产
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
# 事务消息消费
事务消息与消费端无关。
# 集群消费与广播消费
我们可以通过以下代码来设置采用集群模式,RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。
consumer.setMessageModel(MessageModel.CLUSTERING);
通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
# 并发消费与顺序消费
上面已经介绍设置Push Consumer并发消费的方法,通过在注册消费回调接口时传入MessageListenerConcurrently接口的实现来完成。在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
因此RocketMQ提供了顺序消费的方式, 顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现。
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
# 消息过滤
# 消息重试和死信队列
流量控制在消息推模式下都会有,rabbitmq里是消费者主动限流,rocketmq的机制好像是broker根据特定规则进行限流,满足限流条件的情况下就不会再推消息给消费者了。下述参考资料是在5.x版本官网资料找到的,不清楚4.x版本是否也可应用。
消息重试和死信队列 (opens new window)