# RabbitMQ
# RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
# RabbitMQ的工作模型

# RabbitMQ的核心概念
# Producer生产者
生产者负责生产消息
package com.kieoo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 需要提前配置好MQ的基本信息,如Exchange,Queue,Routing-key
*/
@Component
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) throws Exception{
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange.demo", "key.demo.mybusiness", messageBody, correlationData);
}
}
# Connection链接
客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接。RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,提高性能。

# Channel信道
客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。Channel有两个模式,分别是transcational(事务模式,消息一定不会丢失,但是性能较差)以及confirm(发送确认模式,推荐使用)

# Broker服务端
接收和分发消息的应用,RabbitMQ Server服务器就是 Message Broker。
# VirtualHost虚拟机
Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间数据隔离的效果。比如我所在的公司就是很多个应用用一个RabbitMQ服务端的,每一个产品线的应用公用一个VirtualHost。
# Exchange交换机
Exchange是一个比较重要的概念,它是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。那Exchange有哪些分发消息的规则呢?这就要说到Exchange的4种类型了:direct、fanout、topic、headers。Exchange可以通过durable:true开启持久化功能。

# Queue队列
消息队列,队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序。Queue可以通过durable:true开启持久化功能,开启持久化数据会存到操作系统磁盘中,重启Broker不会导致数据丢失。Queue可以通过x-dead-letter-exchange以及x-dead-letter-routing-key配置死信队列。

# Routing-Key
在RabbitMQ路由模式下,Exchange和Queue通过Router-Key绑定在一起。
# 消费者
消费者从Queue中消费消息。
package com.kieoo;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception{
MessageProperties messageProperties = message.getMessageProperties();
// 消息ID
Long deliveryTag = messageProperties.getDeliveryTag();
// 消费端限流
channel.basicQos(1);
// 业务逻辑处理
try{
// 执行业务逻辑处理消息
String messageBody = new String(message.getBody());
// 这里省略了业务逻辑的处理代码
// 消息处理完成
channel.basicAck(deliveryTag,true);
} catch(Exception exception){
/**
* 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
*/
channel.basicReject(deliveryTag,false);
}
}
}
# RabbitMQ常见的几种工作模式
# 简单模式
一个生产者和一个消费者,消息被发送到队列后,被消费者消费。
# 工作队列模式
一个生产者对应多个消费者,消费者之间竞争去同一队列取消息。
# 发布订阅模式
生产者发送消息到交换机,交换机将消息分发给所有订阅的队列。
# 路由模式常用
生产者将消息发送给交换机,交换机根据路由键(Routing Key)将消息路由到特定的队列。
# Topics主题路由
基于主题的路由,使用通配符来匹配消息和队列。
# RabbitMQ路由模式说明及应用

# 死信队列
# 什么是死信队列?
死信队列(Dead Letter Queue, DLQ)是一种特殊的消息队列,用于存储那些由于各种原因(如处理失败、过期、消费超时等)无法被正常消费的消息。
# 死信队列如何设置?
死信队列和其他队列一样,也需要设置交换机exchange和routing-key路由键。同时,我们在设置正常业务的消息队列的时候,还需要使用x-dead-letter-exchange以及x-dead-letter-routing-key配置死信队列。
package com.kieoo.config;
import com.kieoo.MyDeadListener;
import com.kieoo.MyMessageListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p><b>Description:</b> RabbitMQ交换机、队列的配置类.定义交换机、key、queue并做好绑定。
* <p><b>Company:</b>
*
*/
@Configuration
public class MessageQueueConfig {
//========================== 声明交换机 ==========================
/**
* 交换机
*/
@Bean
public DirectExchange transExchange() {
return new DirectExchange("exchange.demo");
}
//========================== 声明死信队列 ===========================
/**
* 死信队列
*/
@Bean
public Queue deadQueue() {
return new Queue("queue.dead.demo",true,false,false);
}
/**
* 通过路由key绑定交换机和死信队列
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(transExchange())
.with("key.dead.demo");
}
/**
* 死信队列的监听
* @param connectionFactory RabbitMQ连接工厂
* @param myDeadListener 队列监听器
* @return 监听容器对象
*/
@Bean
public SimpleMessageListenerContainer deadSimpleMessageListenerContainer(ConnectionFactory connectionFactory, MyDeadListener myDeadListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(deadQueue());
container.setExposeListenerChannel(true);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(myDeadListener);
container.setPrefetchCount(1);
return container;
}
//========================== 声明业务消息队列 ===========================
/**
* 业务消息队列
*/
@Bean
public Queue mybusinessQueue() {
return QueueBuilder.durable("queue.demo.mybusiness")
.withArgument("x-dead-letter-exchange", "exchange.demo")
.withArgument("x-dead-letter-routing-key", "key.dead.demo")
.build();
}
/**
* 通过路由key绑定交换机和短信队列
*/
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(mybusinessQueue()).to(transExchange())
.with("key.demo.mybusiness");
}
/**
* 业务队列的监听
* @param connectionFactory RabbitMQ连接工厂
* @param myMessageListener 队列监听器
* @return 监听容器对象
*/
@Bean
public SimpleMessageListenerContainer businessSimpleMessageListenerContainer(ConnectionFactory connectionFactory,
MyMessageListener myMessageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mybusinessQueue());
container.setExposeListenerChannel(true);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(myMessageListener);
container.setPrefetchCount(1);
container.setConcurrentConsumers(20);
return container;
}
}
# 什么情况下会往死信队列写入消息?
消息超时(通过设置超时时间来配置),消费方消费失败(消费方返回消息处理结果NACK)等情况下,消息会进入死信队列中。伪代码如下:
package com.kieoo.mq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
public ChannelAwareMessageListener(){
}
public abstract void receiveMessage(Message message) throws Exception;
public void onMessage(Message message, Channel channel) throws Exception{
MessageProperties messageProperties = message.getMessageProperties();
// 消息ID
Long deliveryTag = messageProperties.getDeliveryTag();
// 消费端限流,可以看到该服务实际上采取的是推模式监听,即Broker会推消息给该服务
channel.basicQos(1);
// 业务逻辑处理
try{
// 执行业务逻辑处理消息
this.receiveMessage(message);
// 消息处理完成
channel.basicAck(deliveryTag,true);
} catch(Exception exception){
/**
* 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
*/
channel.basicReject(deliveryTag,false);
}
}
}
# 死信队列的消息如何处理?
我自己的项目里会尝试重新投递死信队列的消息。
package com.kieoo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyDeadListener implements ChannelAwareMessageListener {
@Autowired
private RabbitTemplate rabbitTemplate;
public void onMessage(Message message, Channel channel) throws Exception{
MessageProperties messageProperties = message.getMessageProperties();
// 消息ID
Long deliveryTag = messageProperties.getDeliveryTag();
// 消费端限流
channel.basicQos(1);
// 业务逻辑处理
try{
// 执行业务逻辑处理消息
String messageBody = new String(message.getBody());
// 重新投递消息
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange.demo", "key.demo.mybusiness", messageBody, correlationData);
// 消息处理完成
channel.basicAck(deliveryTag,true);
} catch(Exception exception){
/**
* 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
*/
channel.basicReject(deliveryTag,false);
}
}
# 消息拉取模式与推送模式
RabbitMQ同时支持消息拉取与消息推送模式。
# 消息拉取
消息拉取指的是消费者定期请求Broker获取消息进行消费。
# 消息推送
消息推送指的是Broker将消息从queue推送给消费者。消息推送模式下,我们可以在消费端对消息进行限流。
# 消息推模式下消费端限流
在RabbitMQ中,推模式(push mode)通常是指使用queue.declare方法创建的传统消息推送模式。而实现推模式下的消费者限流,可以通过设置消费者在非自动确认模式下,手动控制消息的确认来实现。
package com.kieoo.mq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
public ChannelAwareMessageListener(){
}
public abstract void receiveMessage(Message message) throws Exception;
public void onMessage(Message message, Channel channel) throws Exception{
MessageProperties messageProperties = message.getMessageProperties();
// 消息ID
Long deliveryTag = messageProperties.getDeliveryTag();
/**
* 消费端限流,可以看到该服务实际上采取的是推模式监听,即Broker会推消息给该服务
* 在我的这个场景下,消费者能力较弱,消费完1条消息之后反馈给Broker,Broker才会发送下一条消息过来
*/
channel.basicQos(1);
// 业务逻辑处理
try{
// 执行业务逻辑处理消息
this.receiveMessage(message);
// 消息处理完成
channel.basicAck(deliveryTag,true);
} catch(Exception exception){
/**
* 消息处理失败,向channel发送拒绝标志,如果queue里配置了死信队列,消息就会被移动到死信队列中
*/
channel.basicReject(deliveryTag,false);
}
}
}
# 生产者的消息可靠投递
RabbitMQ中,生产者的消息可靠性投递分为两部分,一部分是连接的可靠性确认,另外一部分是消息发送的可靠性确认。
# 连接可靠性
在Spring中,如果与RabbitMQ建立连接异常(如RabbitMQ宕机),请求会一直重试,同时当前线程阻塞,直到RabbitMQ重启完成,连接重新建立。
# 消息投递可靠性
消息投递可靠性是通过发送消息后RabbitMQ的返回结果来保证。RabbitMQ有两种确认机制,分别是Publisher Confirm,Publisher Return在生产者发送消息后,RabbitMQ会回调生产者,返回ACK(投递成功)、NACK(投递失败)状态。针对非持久化消息,Broker接收到消息之后就会反馈ACK(Publisher Confirm),针对持久化消息,消息回写到磁盘后才会返回ACK(Publisher Confirm)。此外还有一种情况,Broker会反馈ACK以及Return (对应Publisher Return机制,这种情况一般生产不会出现,不需特别关注)
# 消费者的消息可靠性
消费者的消息可靠性是通过收到消息之后发送接收结果给Broker来保证的。反馈结果有三种,分别是ACK,NACK,REJECT。
# ACK
消费者成功接收消息并处理。Broker接收到ACK返回结果后会删除消息。
# NACK
消费者成功接收消息但是处理失败。Broker接收到NACK后会再次投递消息。
# REJECT
消费者成功接收消息但是处理失败,以及消费者明确拒绝消息。Broker会从队列中删除该消息。如果该队列配置了死信队列,消息会被转移到死信队列中。
# 消息幂等性处理
由于网络等原因,消费者可能针对一个消息生产了多次,即Broker收到了多次同一个消息(Broker通过消息Key保证唯一,后发的消息会覆盖前面发的消息)。也有可能消费者针对一个消息消费了多次,这种情况我们需要在消费者的消费业务逻辑里保证幂等性(消费多条相同的消息,得到的结果和消费一次是一样的)。在我自己的项目实际的业务实现里,是通过分布式锁的方式来保证消息幂等性的。具体实现思路如下:
1.查询redis中是否有唯一键,如不存在,代表第一次处理消息。
2.处理消息。
3.对关键结果根据唯一性规则进行拼接,生成消息唯一键,存入redis,设置过期时间。
4.返回ACK给Broker,代表消息处理完成。
# RabbitMQ里如何看队列里的消息详情
默认情况下,在RabbitMQ里无法查看生产者生产的消息详情,需要使用RabbitMQ的消息追踪Firehose功能,这个功能本质上还是用复用RabbitMQ的交换机和消息队列功能来实现的。一般无此必要,发消息和接收消息之前应用自己打下日志就行。