# RabbitMQ

# RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

# RabbitMQ的工作模型


# RabbitMQ的核心概念

# Producer生产者

生产者负责生产消息

package com.kieoo;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 需要提前配置好MQ的基本信息,如Exchange,Queue,Routing-key
 */
@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
     public void sendMessage(String message) throws Exception{
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("exchange.demo", "key.demo.mybusiness", messageBody, correlationData);    
    }
}

# Connection链接

客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接。RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,提高性能。

# Channel信道

客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。Channel有两个模式,分别是transcational(事务模式,消息一定不会丢失,但是性能较差)以及confirm(发送确认模式,推荐使用)

# Broker服务端

接收和分发消息的应用,RabbitMQ Server服务器就是 Message Broker。

# VirtualHost虚拟机

Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间数据隔离的效果。比如我所在的公司就是很多个应用用一个RabbitMQ服务端的,每一个产品线的应用公用一个VirtualHost。

# Exchange交换机

Exchange是一个比较重要的概念,它是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。那Exchange有哪些分发消息的规则呢?这就要说到Exchange的4种类型了:direct、fanout、topic、headers。Exchange可以通过durable:true开启持久化功能。

# Queue队列

消息队列,队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序。Queue可以通过durable:true开启持久化功能,开启持久化数据会存到操作系统磁盘中,重启Broker不会导致数据丢失。Queue可以通过x-dead-letter-exchange以及x-dead-letter-routing-key配置死信队列。

# Routing-Key

在RabbitMQ路由模式下,Exchange和Queue通过Router-Key绑定在一起。

# 消费者

消费者从Queue中消费消息。

package com.kieoo;

import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

    public void onMessage(Message message, Channel channel) throws Exception{
        MessageProperties messageProperties = message.getMessageProperties();
        // 消息ID
        Long deliveryTag = messageProperties.getDeliveryTag();
        // 消费端限流
        channel.basicQos(1);
        // 业务逻辑处理
        try{
            // 执行业务逻辑处理消息
            String messageBody = new String(message.getBody());
            // 这里省略了业务逻辑的处理代码
            // 消息处理完成
            channel.basicAck(deliveryTag,true);
        } catch(Exception exception){
            /**
             * 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
             */
            channel.basicReject(deliveryTag,false);
        }
    }
}

# RabbitMQ常见的几种工作模式

# 简单模式

一个生产者和一个消费者,消息被发送到队列后,被消费者消费。

# 工作队列模式

一个生产者对应多个消费者,消费者之间竞争去同一队列取消息。

# 发布订阅模式

生产者发送消息到交换机,交换机将消息分发给所有订阅的队列。

# 路由模式常用

生产者将消息发送给交换机,交换机根据路由键(Routing Key)将消息路由到特定的队列。

# Topics主题路由

基于主题的路由,使用通配符来匹配消息和队列。

# RabbitMQ路由模式说明及应用


# 死信队列

# 什么是死信队列?

死信队列(‌Dead Letter Queue, DLQ)‌是一种特殊的消息队列,‌用于存储那些由于各种原因(‌如处理失败、‌过期、‌消费超时等)‌无法被正常消费的消息。‌

# 死信队列如何设置?

死信队列和其他队列一样,也需要设置交换机exchange和routing-key路由键。同时,我们在设置正常业务的消息队列的时候,还需要使用x-dead-letter-exchange以及x-dead-letter-routing-key配置死信队列。

package com.kieoo.config;

import com.kieoo.MyDeadListener;
import com.kieoo.MyMessageListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p><b>Description:</b> RabbitMQ交换机、队列的配置类.定义交换机、key、queue并做好绑定。
 * <p><b>Company:</b>
 *
 */
@Configuration
public class MessageQueueConfig {


    //========================== 声明交换机 ==========================
    /**
     * 交换机
     */
    @Bean
    public DirectExchange transExchange() {
        return new DirectExchange("exchange.demo");
    }

    //========================== 声明死信队列 ===========================
    /**
     * 死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue("queue.dead.demo",true,false,false);
    }

    /**
     * 通过路由key绑定交换机和死信队列
     */
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(transExchange())
                .with("key.dead.demo");
    }

    /**
     * 死信队列的监听
     * @param connectionFactory RabbitMQ连接工厂
     * @param myDeadListener  队列监听器
     * @return 监听容器对象
     */
    @Bean
    public SimpleMessageListenerContainer deadSimpleMessageListenerContainer(ConnectionFactory connectionFactory, MyDeadListener myDeadListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(deadQueue());
        container.setExposeListenerChannel(true);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(myDeadListener);
        container.setPrefetchCount(1);
        return container;
    }

    //========================== 声明业务消息队列 ===========================
    /**
     * 业务消息队列
     */
    @Bean
    public Queue mybusinessQueue() {
        return QueueBuilder.durable("queue.demo.mybusiness")
                .withArgument("x-dead-letter-exchange", "exchange.demo")
                .withArgument("x-dead-letter-routing-key", "key.dead.demo")
                .build();
    }

    /**
     * 通过路由key绑定交换机和短信队列
     */
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(mybusinessQueue()).to(transExchange())
                .with("key.demo.mybusiness");
    }

    /**
     * 业务队列的监听
     * @param connectionFactory RabbitMQ连接工厂
     * @param myMessageListener  队列监听器
     * @return 监听容器对象
     */
    @Bean
    public SimpleMessageListenerContainer businessSimpleMessageListenerContainer(ConnectionFactory connectionFactory,
                                                                            MyMessageListener myMessageListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(mybusinessQueue());
        container.setExposeListenerChannel(true);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(myMessageListener);
        container.setPrefetchCount(1);
        container.setConcurrentConsumers(20);
        return container;
    }
}

# 什么情况下会往死信队列写入消息?

消息超时(通过设置超时时间来配置),消费方消费失败(消费方返回消息处理结果NACK)等情况下,消息会进入死信队列中。伪代码如下:

package com.kieoo.mq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

public abstract class AbstractMessageListener implements ChannelAwareMessageListener {

    public ChannelAwareMessageListener(){

    }

    public abstract void receiveMessage(Message message) throws Exception;

    public void onMessage(Message message, Channel channel) throws Exception{
        MessageProperties messageProperties = message.getMessageProperties();
        // 消息ID
        Long deliveryTag = messageProperties.getDeliveryTag();
        // 消费端限流,可以看到该服务实际上采取的是推模式监听,即Broker会推消息给该服务
        channel.basicQos(1);
        // 业务逻辑处理
        try{
            // 执行业务逻辑处理消息
            this.receiveMessage(message);
            // 消息处理完成
            channel.basicAck(deliveryTag,true);
        } catch(Exception exception){
            /**
             * 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
             */
            channel.basicReject(deliveryTag,false);
        }
    }
}

# 死信队列的消息如何处理?

我自己的项目里会尝试重新投递死信队列的消息。

package com.kieoo;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyDeadListener implements ChannelAwareMessageListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
     public void onMessage(Message message, Channel channel) throws Exception{
        MessageProperties messageProperties = message.getMessageProperties();
        // 消息ID
        Long deliveryTag = messageProperties.getDeliveryTag();
        // 消费端限流
        channel.basicQos(1);
        // 业务逻辑处理
        try{
            // 执行业务逻辑处理消息
            String messageBody = new String(message.getBody());
            // 重新投递消息
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("exchange.demo", "key.demo.mybusiness", messageBody, correlationData);
            // 消息处理完成
            channel.basicAck(deliveryTag,true);
        } catch(Exception exception){
            /**
             * 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
             */
            channel.basicReject(deliveryTag,false);
        }
}

# 消息拉取模式与推送模式

RabbitMQ同时支持消息拉取与消息推送模式。

# 消息拉取

消息拉取指的是消费者定期请求Broker获取消息进行消费。

# 消息推送

消息推送指的是Broker将消息从queue推送给消费者。消息推送模式下,我们可以在消费端对消息进行限流。

# 消息推模式下消费端限流

在RabbitMQ中,推模式(push mode)通常是指使用queue.declare方法创建的传统消息推送模式。而实现推模式下的消费者限流,可以通过设置消费者在非自动确认模式下,手动控制消息的确认来实现。

package com.kieoo.mq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

public abstract class AbstractMessageListener implements ChannelAwareMessageListener {

    public ChannelAwareMessageListener(){

    }

    public abstract void receiveMessage(Message message) throws Exception;

    public void onMessage(Message message, Channel channel) throws Exception{
        MessageProperties messageProperties = message.getMessageProperties();
        // 消息ID
        Long deliveryTag = messageProperties.getDeliveryTag();
        /**
         * 消费端限流,可以看到该服务实际上采取的是推模式监听,即Broker会推消息给该服务
         * 在我的这个场景下,消费者能力较弱,消费完1条消息之后反馈给Broker,Broker才会发送下一条消息过来
         */
        channel.basicQos(1);
        // 业务逻辑处理
        try{
            // 执行业务逻辑处理消息
            this.receiveMessage(message);
            // 消息处理完成
            channel.basicAck(deliveryTag,true);
        } catch(Exception exception){
            /**
             * 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
             */
            channel.basicReject(deliveryTag,false);
        }
    }
}

# 生产者的消息可靠投递

RabbitMQ中,生产者的消息可靠性投递分为两部分,一部分是连接的可靠性确认,另外一部分是消息发送的可靠性确认。

# 连接可靠性

在Spring中,如果与RabbitMQ建立连接异常(如RabbitMQ宕机),请求会一直重试,同时当前线程阻塞,直到RabbitMQ重启完成,连接重新建立。

# 消息投递可靠性

消息投递可靠性是通过发送消息后RabbitMQ的返回结果来保证。RabbitMQ有两种确认机制,分别是Publisher Confirm,Publisher Return在生产者发送消息后,RabbitMQ会回调生产者,返回ACK(投递成功)、NACK(投递失败)状态。针对非持久化消息,Broker接收到消息之后就会反馈ACK(Publisher Confirm),针对持久化消息,消息回写到磁盘后才会返回ACK(Publisher Confirm)。此外还有一种情况,Broker会反馈ACK以及Return (对应Publisher Return机制,这种情况一般生产不会出现,不需特别关注)

# 消费者的消息可靠性

消费者的消息可靠性是通过收到消息之后发送接收结果给Broker来保证的。反馈结果有三种,分别是ACK,NACK,REJECT。

# ACK

消费者成功接收消息并处理。Broker接收到ACK返回结果后会删除消息。

# NACK

消费者成功接收消息但是处理失败。Broker接收到NACK后会再次投递消息。

# REJECT

消费者成功接收消息但是处理失败,以及消费者明确拒绝消息。Broker会从队列中删除该消息。如果该队列配置了死信队列,消息会被转移到死信队列中。

# 消息幂等性处理

由于网络等原因,消费者可能针对一个消息生产了多次,即Broker收到了多次同一个消息(Broker通过消息Key保证唯一,后发的消息会覆盖前面发的消息)。也有可能消费者针对一个消息消费了多次,这种情况我们需要在消费者的消费业务逻辑里保证幂等性(消费多条相同的消息,得到的结果和消费一次是一样的)。在我自己的项目实际的业务实现里,是通过分布式锁的方式来保证消息幂等性的。具体实现思路如下:
    1.查询redis中是否有唯一键,如不存在,代表第一次处理消息。
    2.处理消息。
    3.对关键结果根据唯一性规则进行拼接,生成消息唯一键,存入redis,设置过期时间。
    4.返回ACK给Broker,代表消息处理完成。

# RabbitMQ里如何看队列里的消息详情

默认情况下,在RabbitMQ里无法查看生产者生产的消息详情,需要使用RabbitMQ的消息追踪Firehose功能,这个功能本质上还是用复用RabbitMQ的交换机和消息队列功能来实现的。一般无此必要,发消息和接收消息之前应用自己打下日志就行。