# RocketMQ4.x

# RocketMQ4.x介绍

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。RocketMQ遵从(参考了)JMS规范(Java Message Service)实现(ActiveMQ也基于JMS规范实现)。

# RocketMQ的基本概念

# 生产者

生产消息,无需过多介绍。

# 消费者

消费消息,无需过多介绍。

# Broker

Broker的概念与RabbitMQ一样,Broker就是消息服务器。

# Topic

主题,类似RabbitMQ的交换机Exchange,生产者需要通过Topic发送消息到MessageQueue。

# MessageQueue

消息队列,存储消息,供消费者消费。

# NameServer

生产者和消费者都要通过NameServer的服务发现功能找到Broker,再通过Broker中的MessageQueue收发消息,可以看到NamerServer基本等价于Nacos,充当着服务发现者的角色。RocketMQ中NameServer集群中NameServer与NameServer之间相互独立,没有任何通信行为

# RocketMQ的工作模型

# RocketMQ的消息模型


# RocketMQ的部署模型


# 单机模式下RocketMQ部署

# NameServer部署

    1.在环境变量里设置rocketmq的安装配置并保存

# 修改环境变量文件
vim /etc/profile
# 增加如下行后:wq保存
export ROCKETMQ_HOME=/path/to/rocketmq-all-*
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 使环境变量立刻生效
source /etc/profile

    2.进入rocketmq的bin目录,执行如下指令启动namesrv服务

# 静默启动
nohup ./mqnamesrv &

# Broker部署

    1.在rocketmq的安装目录下,修改配置文件

# 修改环境变量文件,修改后使用:wq保存并退出
vim broker.conf
# 设置允许自动创建topic
autoCreateTopicEnable=true
# 设置namesrv的ip以及端口
namesrvAddr=127.0.0.1:9876

    2.进入rocketmq的bin目录,执行如下指令启动broker服务

# 静默启动,使用-c参数指定配置文件
nohup ./mqbroker -c ../conf/broker.conf &

# RocketMQ Dashboard编译安装部署

由于项目中使用的rocketmq dashboard存在未授权访问问题,因此需要重新编译打包并部署rocketmq dashboard服务,故记录如下:
    1.在官网的git仓库rocketmq dashboard源码地址 (opens new window)中拉取源码。
    2.修改项目的application.properties文件,将其中的rocketmq.config.loginRequire配置修改为true。

    3.修改项目的users.properties文件,在其中添加属性键值对key=value,key代表用户,value的值代表密码及权限,如这里的admin=asssAAA,1即代表管理员权限,xuhaodi=xxxxaawwa则为普通用户权限

    4.打包并在测试环境使用如下指令启动服务即可

# 这里dashboard启动时指定了nameserver的ip地址以及端口
nohup java -Xms512m -Xmx2048m -jar -Dfile.encoding=UTF-8 -Drocketmq.namesrv.addr=127.0.0.1:9876 rockert-dashboard-1.0.0.jar >> /dev/null 2>&1 &

    5.登录dashboard验证,弹出登录框提示鉴权,输入用户名密码后成功登录。


    6.集群以及分片查看。

    7.主题以及该主题消费者查看。


    8.消费者组查看。

    9.生产者组查看。

    10.消息及消息详情查看。



# RocketMQ集群搭建的几种方式

RocketMQ集群搭建有三种方式,分别是同步主从,异步主从,多主模式。

# 同步主从模式

生产者发送消息,主节点收到消息后就会同步到从节点,至少有1个从节点完成该消息的同步后,主节点才会返回消息发送成功的状态码ACK给生产者。

# 异步主从模式

生产者发送消息,主节点收到消息后就会返回ACK给生产者。同时主节点异步将消息同步给从节点。

# 多主模式

多主模式中,每个主节点独立收发消息。

# 集群模式下RocketMQ部署

这里以192.168.0.1,192.168.0.2,192.168.0.3,192.168.0.4,192.168.0.5,192.168.0.6为例介绍如何部署RocketMQ集群。

# NameServer集群部署

在192.168.0.1,192.168.0.2,192.168.0.3上部署NameServer集群,RocketMQ中NameServer与NamerServer之间相互独立,互相之间不会进行通信,因此只需要在这3台机器上照单机模式下的NameServer部署即可。
    1.在环境变量里设置rocketmq的安装配置并保存

# 修改环境变量文件
vim /etc/profile
# 增加如下行后:wq保存
export ROCKETMQ_HOME=/path/to/rocketmq-all-*
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 使环境变量立刻生效
source /etc/profile

    2.进入rocketmq的bin目录,执行如下指令启动namesrv服务

# 静默启动
nohup ./mqnamesrv &

# Broker集群配置

这里以192.168.0.4,192.168.0.5为例介绍Broker集群的配置。Broker集群可选的方式为2m-2s-sync,2m-2s-async,2m-noslave。我们这里使用2m-2s-sync模式部署。其中我们会在192.168.0.4部署第一个主从的主机以及第二个主从的从机,然后在192.168.0.5部署第一个主从的从机以及第二个主从的主机,这样一个互为主从的集群就部署好了。

# 修改配置文件

    1.192.168.0.4机器上的Master Broker配置文件

# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker分片的名称
brokerName=broker-a
# 集群中的主节点
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-a

    2.12.192.168.0.5机器上的Master Broker配置文件

# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-b
# 集群中的主节点
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-b

    3.12.192.168.0.4机器上的Slave Broker配置文件

# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-b
# 集群中的从节点
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-b

    4.192.168.0.5机器上的Slave Broker配置文件

# 这里指定了nameServer的地址
namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
# 这里指定了brokerCluster,brokerCluster的概念相当于rabbitmq的虚拟机Virtual Host,用于隔离环境
brokerClusterName=DefaultCluster
# 这类指定了broker集群的名称
brokerName=broker-a
# 集群中的从节点
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-a

# 启动服务

# 静默启动,使用-c参数指定配置文件
nohup ./bin/mqbroker -c ../conf/broker.conf &

# 启动DashBoard

在RocketMq集群环境下,我们应该也是只需要配置一个NameServer的地址即可访问Dashboard即可。

# 这里dashboard启动时指定了nameserver的ip地址以及端口,百度查阅的资料说是需要指定所有的nameServer,以;分隔
# 考虑到NameServer是无状态的,这里应该只配置一个即可。
# RocketMQ Dashboard 连接 NameServer 集群的目的是为了获取集群的状态信息,并且可以在集群中进行消息的生产和消费。如果你连接多个 NameServer 实例,主要有以下几个好处:
# 高可用性:如果其中一个 NameServer 宕机,RocketMQ Dashboard 仍然可以通过其他 NameServer 实例来保持与 RocketMQ 集群的连接。
# 负载均衡:RocketMQ Dashboard 会将请求均衡地分配到不同的 NameServer 实例上,以分散负载。
# 数据同步:多个 NameServer 实例之间会同步数据,确保它们的状态信息是一致的。
# 连接多个 NameServer 的配置通常在 RocketMQ Dashboard 的配置文件中设置,例如在 conf/broker.conf 文件中,你可以指定多个 NameServer 地址,它们之间使用分号(;)分隔。
nohup java -Xms512m -Xmx2048m -jar -Dfile.encoding=UTF-8 -Drocketmq.namesrv.addr=127.0.0.1:9876 rockert-dashboard-1.0.0.jar >> /dev/null 2>&1 

# RocketMQ 5.X版本Proxy模块介绍

RocketMQ 5.0 时代,6 张图带你理解 Proxy (opens new window)

# RocketMQ中生产者发送消息的三种方式?

生产者发送消息有三种方式,分别是同步发送,异步发送,单向发送。需要特别强调的是,生产者发送消息的方式与消息类型无关,所有的消息类型(普通消息,顺序消息,延迟消息,事务消息)均支持同步发送,异步发送,单向发送。

# 同步发送

获取发送结果后再执行后续流程。

# 异步发送

发送完成后异步回调。

# 单向发送

仅发送,不接收broker的消息发送结果。

# RocketMQ中消费者接收消息的两种方式?

# Pull消费

在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

# Push消费

消息推送模式,在推送模式中,默认为集群模式,即消息队列会将消息随机发到消费者集群中的任意一个服务。

# Broker推送消息给消费者的两种方式

# 集群推送模式

集群模式,即消息队列会将消息随机发到消费者集群中的任意一个服务。

# 广播推送模式

广播模式,即消息队列会将消息发送到所有订阅的消费者中。

# Broker推送消息到MessageQueue的方式

默认情况下,Broker会把消息推送给Topic下任意一个MessageQueue,如果我们觉得这种场景不满足我们的要求,我们可以指定消息要推送的MessageQueue(顺序消息生产),同样的,Broker也会随机推Topic下任何MessageQueue的消息给消费者(顺序消息消费),如果我们觉得这种场景不满足我们的使用,我们也可以指定顺序消费。

# RocketMQ中的功能特性

# 普通消息(并发消息)

# 普通消息生产

RocketMQ中的普通消息指的是无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。普通消息会经由Broker随机的给到Topic下的queue。对于消息发送,仅在普通消息发送代码演示同步发送,异步发送,单向发送特性。

  • 同步发送
    MQ中的默认生产者,生产者请求Broker,生产数据。可以使用同步发送或异步发送去发送消息。
public class SyncProducer {
  public static void main(String[] args) throws Exception {
    /**
     * 定义生产者的同时设置唯一的组名
     * 在 Apache RocketMQ 中,DefaultMQProducer 类是用于发送消息的默认生产者类。这个类需要一个唯一的 groupname,这个 groupname 是指定生产者的分组名称,用于标识同一分组内的生产者。通过为 DefaultMQProducer 设置一个 groupname,可以确保在 RocketMQ 中的唯一性,这对于管理和识别生产者实例非常重要。
    */
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");  //(2)
    // 启动producer
    producer.start();
    for (int i = 0; i < 100; i++) {
      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
       /**
         * 根据Topic及Tag创建消息
         * 消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
         */
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );   //(3)
      // 利用producer进行发送,并同步等待发送结果
      SendResult sendResult = producer.send(msg);   //(4)
      System.out.printf("%s%n", sendResult);
    }
    // 一旦producer不再使用,关闭producer
    producer.shutdown();
  }
}
  • 异步发送
public class AsyncProducer {
  public static void main(String[] args) throws Exception {
     /**
         * 定义生产者的同时设置唯一的组名
         * 在 Apache RocketMQ 中,DefaultMQProducer 类是用于发送消息的默认生产者类。这个类需要一个唯一的 groupname,这个 groupname 是指定生产者的分组名称,用于标识同一分组内的生产者。通过为 DefaultMQProducer 设置一个 groupname,可以确保在 RocketMQ 中的唯一性,这对于管理和识别生产者实例非常重要。
         */
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动producer
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    int messageCount = 100;
    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    for (int i = 0; i < messageCount; i++) {
      try {
          final int index = i;
          // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
           /**
         * 根据Topic及Tag创建消息
         * 消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
         */
          Message msg = new Message("TopicTest",
            "TagA",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
          // 异步发送消息, 发送结果通过callback返回给客户端
          producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
              System.out.printf("%-10d OK %s %n", index,
                sendResult.getMsgId());
              countDownLatch.countDown();
            }
            @Override
            public void onException(Throwable e) {
              System.out.printf("%-10d Exception %s %n", index, e);
              e.printStackTrace();
              countDownLatch.countDown();
            }
          });
        } catch (Exception e) {
            e.printStackTrace();
            countDownLatch.countDown();
        }
    }
    //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
    countDownLatch.await(5, TimeUnit.SECONDS);
    // 一旦producer不再使用,关闭producer
    producer.shutdown();
  }
}
  • 单向发送
/**
 * 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
 */
public class OnewayProducer {
  public static void main(String[] args) throws Exception{
    // 初始化一个producer并设置Producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动producer
    producer.start();
    for (int i = 0; i < 100; i++) {
      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
      );
      // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
      producer.sendOneway(msg);
    }
     // 一旦producer不再使用,关闭producer
     producer.shutdown();
  }
}

# 普通消息消费-拉模式

在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

  • Pull Consumer
/**
 * 拉模式,默认是集群(区别于广播模式)+并发(区别于顺序)消费
 */
public class PullConsumerTest {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
      MessageQueue mq = new MessageQueue();
      mq.setQueueId(0);
      mq.setTopic("TopicTest");
      mq.setBrokerName("jinrongtong-MacBook-Pro.local");
      long offset = 26;
      PullResult pullResult = consumer.pull(mq, "*", offset, 32);
      if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
        System.out.printf("%s%n", pullResult.getMsgFoundList());
        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    consumer.shutdown();
  }
}
  • Lite Pull Consumer-Subscribe模式
/**
 * 随机获取queue消息
 */
public class LitePullConsumerSubscribe {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setPullBatchSize(20);
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}
  • Lite Pull Consumer-Assign模式
/**
 * 获取指定queue的消息
 */
public class LitePullConsumerAssign {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        litePullConsumer.start();
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

# 普通消息消费-推模式

消息推模式,采用这种模式的消费方式,消息消费由Broker控制,当Broker收到消息后,就会请求Consumer,Consumer会消费消息。同时在Consumer侧可以配置流量控制参数(防止推送消息过多,超过Consumer的消费能力)。普通消息的消费默认是采用集群消费

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
   
    // 设置NameServer地址 
    consumer.setNamesrvAddr("localhost:9876");
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    // 这里有两个选项可选,分别是并发以及顺序消费,顺序消费需要配合顺序生产者,并发消费需要配合普通生产者
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // 启动Consumer
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

# 顺序消息

顺序消息是一种对消息发送和消费顺序有严格要求的消息。这里的顺序消息指的是局部的顺序消息,使用这种类型的消息,消息写入某个Topic下指定的queue是有序的。多个queue之间消息是无序的(类似jvm中的线程栈的概念)。

# 顺序消息生产

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

# 顺序消息消费

consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

# 延迟消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。生产者可以在发送消息的时候设置延时时间,Broker收到消息后,会在延时时间结束后投递消息。延时消息可以是顺序消息或普通消息,但是不能是事务消息。

# 延迟消息生产

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
        
        // Shutdown producer after use.
        producer.shutdown();
    }
    
}

# 延迟消息消费

一条消息是否是延迟消息,对消费者是无感的。因此,延迟消息的消费与普通消息/顺序消息完全一致。

# 批量消息

# 批量消息生产

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
}

# 批量消息消费

一批消息是否是批量发送消息,对消费者是无感的。因此,批量消息的消费与普通消息/顺序消息完全一致。在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

# 事务消息

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。事务消息是一种更强的同步发送消息,这种发送方式要求Broker及生产者对消息和事务进行确认及回滚等操作。
事务消息发送 (opens new window)

# 事务消息生产

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

# 事务消息消费

事务消息与消费端无关。

# 集群消费与广播消费

我们可以通过以下代码来设置采用集群模式,RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。

consumer.setMessageModel(MessageModel.CLUSTERING);

通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。

consumer.setMessageModel(MessageModel.BROADCASTING);

# 并发消费与顺序消费

上面已经介绍设置Push Consumer并发消费的方法,通过在注册消费回调接口时传入MessageListenerConcurrently接口的实现来完成。在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
因此RocketMQ提供了顺序消费的方式, 顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现。

consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

# 消息过滤

消息过滤 (opens new window)

# 消息重试和死信队列

流量控制在消息推模式下都会有,rabbitmq里是消费者主动限流,rocketmq的机制好像是broker根据特定规则进行限流,满足限流条件的情况下就不会再推消息给消费者了。下述参考资料是在5.x版本官网资料找到的,不清楚4.x版本是否也可应用。
消息重试和死信队列 (opens new window)

# 消息发送重试和流控机制

消息发送重试和流控机制 (opens new window)