🧀 RabbitMQ 发送可靠性
微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了。
注意,以下内容主要为如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。
🌱 RabbitMQ 消息发送机制
大家知道,RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。

大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:
- 消息成功到达 Exchange
- 消息成功到达 Queue
如果能确认这两步,那么我们就可以认为消息发送成功了。
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
- 确认消息到达 Exchange。
- 确认消息到达 Queue。
- 开启定时任务,定时投递那些发送失败的消息。
🍕 RabbitMQ 的努力
上面提出的三个步骤,第三步需要我们自己实现,前两步 RabbitMQ 则有现成的解决方案。
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
- 开启事务机制
- 发送方确认机制
这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:

我们分别来看。以下所有案例都在 Spring Boot 中展开,文末可以下载相关源码。
😲 开启事务机制
Spring Boot 中开启 RabbitMQ 事务机制的方式如下:
首先需要先提供一个事务管理器,如下:
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
@Service
public class MsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Transactional
public void send() {
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
int i = 1 / 0;
}
}
这里注意两点:
- 发送消息的方法上添加 @Transactional 注解标记事务。
- 用 setChannelTransacted 方法设置为 true 开启事务模式。
这就 OK 了。
在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,同意将信道设置为事务模式。
- 客户端发送消息。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。
😛 发送方确认机制
🍡 单条消息处理
第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:
- none:表示禁用发布确认模式,默认即此。
- correlated:表示成功发布消息到交换器后会触发的回调方法。
- simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
接下来我们要开启两个监听,具体配置如下:
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(JAVABOY_QUEUE_NAME);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(JAVABOY_EXCHANGE_NAME);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(JAVABOY_QUEUE_NAME);
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("{}:消息成功到达交换器",correlationData.getId());
}else{
logger.error("{}:消息发送失败", correlationData.getId());
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
}
}
关于这个配置类,我说如下几点:
- 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
- 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。
这就可以了。
接下来我们对消息发送进行测试。
首先我们尝试将消息发送到一个不存在的交换机中,像下面这样:
rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下错误:

接下来我们给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意此时第二个参数是一个字符串,不是变量。

可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列(因为队列不存在)。
这是一条消息的发送,我们再来看看消息的批量发送。
🍧 消息批量处理
如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
这就是 publisher-confirm 模式。
相比于事务,这种模式下的消息吞吐量会得到极大的提升。
🍝 失败重试
失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
两种重试我们分别来看。
🍲自带重试机制
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
从上往下配置含义依次是:
- 开启重试机制。
- 重试起始间隔时间。
- 最大重试次数。
- 最大重试间隔时间。
- 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。

🍤 业务重试
业务重试主要是针对消息没有到达交换器的情况。
如果消息没有成功到达交换器,根据我们第二小节的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
整体思路是这样:
- 首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
- status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
- tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
- count:表示消息重试次数。
其他字段都很好理解,我就不一一啰嗦了。
- 消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
- 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
- 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
当然这种思路有两个弊端:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
- 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。
🍼 RabbitMQ 消费可靠性
🍺 两种消费思路
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
- 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
- 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
两种方式我都举个例子看下。
先来看推(push):
这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(String msg) {
System.out.println("msg = " + msg);
}
}
当监听的队列中有消息时,就会触发该方法。
再来看拉(pull):
@Test
public void test01() throws UnsupportedEncodingException {
Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
这是消息两种不同的消费模式。
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。
🍬 确保消费成功两种思路
为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
- 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
- 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
我们来看一张图:

如上图所示,在 RabbitMQ 的 web 管理页面:
- Ready 表示待消费的消息数量。
- Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
这是我们可以从 UI 层面观察消息的消费情况确认情况。
当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
- 待消费的消息
- 已经投递给消费者,但是还没有被消费者确认的消息
换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。
🍭 消息拒绝
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(Channel channel, Message message) {
//获取消息编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//拒绝消息
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
- 获取消息编号 deliveryTag。
- 调用 basicReject 方法拒绝消息。
最后我们再来说说消息的幂等性问题。
大家设想下面一个场景:
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次(参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。种种原因导致我们在消费消息时,一定要处理好幂等性问题。
幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
- id-0(正在执行业务)
- id-1(执行业务成功)
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。
🍮 理解 VirtualHost
当我们第一次安装好一个 RabbitMQ 之后,我们可能都会通过 Web 页面去管理这个 RabbitMQ,默认情况下,我们第一次使用的默认用户是 admin。
登录成功后,在 admin 选项卡可以查看所有用户:
可以看到,每个用户都有一个 Can access virtual hosts 属性,这个属性是啥意思呢?
🎂 多租户
RabbitMQ 中有一个概念叫做多租户,怎么理解呢?
我们安装一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。
我们并不需要特别的去看待 vhost,他就跟普通的物理 RabbitMQ 一样,不同的 vhost 能够提供逻辑上的分离,确保不同的应用消息队列能够安全独立运行。
要我来说,我们该怎么看待 vhost 和 RabbitMQ 的关系呢?RabbitMQ 相当于一个 Excel 文件,而 vhost 则是 Excel 文件中的一个个 sheet,我们所有的操作都是在某一个 sheet 上进行操作。
本质上来说,vhost 算是 AMQP 协议中的概念。
🥞 命令行创建 vhost
因为这里的 RabbitMQ 是用 docker 安装的,首先进入到 docker 容器中:
docker exec -it some-rabbit /bin/bash
然后执行如下命令创建一个名为 /myvh 的 vhost:
rabbitmqctl add_vhost myvh
最终执行结果如下:
c.q1cong.cnc.qicong77.com/halo/13a7e6dc32b15805c8ff3be958e777dd.png)
然后通过如下命令可以查看已有的 vhost:
rabbitmqctl list_vhosts

当然这个命令也可以添加两个选项 name 和 tracing,name 表示 vhost 的名称,tracing 则表示是否使用了 tracing 功能(tracing 可以帮助追踪 RabbitMQ 中消息的流入流出情况),如下图:

可以通过如下命令删除一个 vhost:
rabbitmqctl delete_vhost myvh

当删除一个 vhost 的时候,与这个 vhost 相关的消息队列、交换机以及绑定关系等,统统都会被删除。
给一个用户设置 vhost:
rabbitmqctl set_permissions -p myvh guest ".*" ".*" ".*"

前面参数都好说,最后面三个 “.*” 含义分别如下:
- 用户在所有资源上都拥有可配置权限(创建/删除消息队列、创建/删除交换机等)。
- 用户在所有资源上都拥有写权限(发消息)。
- 用户在所有资源上都拥有读权限(消息消费,清空队列等)。
禁止一个用户访问某个 vhost:
rabbitmqctl clear_permissions -p myvh guest

⚡️ 管理页面创建 vhost
当然我们也可以在网页端管理 vhost:
在 admin 选项卡中,点击右边的 Virtual Hosts,如下:


进入到某一个 vhost 之后,可以修改其权限以及删除一个 vhost,如下图:

✨ 用户管理
因为 vhost 通常跟用户一起出现,所以这里我也顺便说下 user 的相关操作。
添加一个用户名为 javaboy,密码为 123 的用户,方式如下:
rabbitmqctl add_user javaboy 123

通过如下命令可以修改用户密码(将 javaboy 的密码改为 123456):
rabbitmqctl change_password javaboy 123456

通过如下命令可以验证用户密码:
rabbitmqctl authenticate_user javaboy 123456
验证成功和验证失败的情况分别如下:

通过如下命令可以查看当前的所有用户:

第一列是用户名,第二列是用户角色。
给用户设置角色的命令如下(给 javaboy 设置 administrator 角色):
rabbitmqctl set_user_tags javaboy administrator

最后,删除一个用户的命令如下:
rabbitmqctl delete_user javaboy

🥚 RabbitMQ 权限系统
不管我们是通过网页还是通过命令行工具创建用户对象,刚创建好的用户对象都是没法直接使用的,需要我们首先把这个用户置于某一个 vhost 之下,然后再赋予其权限,有了权限,这个用户才可以正常使用。
🌱 RabbitMQ 权限系统介绍
RabbitMQ 是从 1.6 这个版本开始实现了一套 ACL 风格的权限系统,在这套 ACL 风格的权限管理系统中,允许非常多细粒度的权限控制,可以为不同用户分别设置读、写以及配置等权限。
这里涉及到三种不同的权限:
- 读:和消息消费有关的所有操作,包括清除整个队列的消息。
- 写:发布消息。
- 配置:消息队列、交换机等的创建和删除。
这是 RabbitMQ 权限系统的一个简单介绍。
🌿 操作和权限的对应关系
接下来,下图展示了操作和权限的对应关系:

☘️ 权限操作命令
RabbitMQ 中权限操作命令格式如下:
rabbitmqctl set_permissions [-p vhosts] {user} {conf} {write} {read}
这里有几个参数:
- [-p vhost]:授予用户访问权限的 vhost 名称,如果不写默认为 /。
- user:用户名。
- conf:用户在哪些资源上拥有可配置权限(支持正则表达式)。
- write:用户在哪些资源上拥有写权限(支持正则表达式)。
- read:用户在哪些资源上拥有读权限(支持正则表达式)。
假设我们有一个名为 zhangsan 的用户,我们希望该用户在 myvh 虚拟主机下具备所有权限,那么我们的操作命令如下:
rabbitmqctl set_permissions -p myvh zhangsan ".*" ".*" ".*"
执行结果如下:

接下来执行如下命令可以验证授权是否成功:
rabbitmqctl -p myvh list_permissions

可以看到,张三的权限已经赋值到位。
在上面的授权命令中,我们用的都是 “.*”,松哥再额外说下这个通配符:
- “.*”:这个表示匹配所有的交换机和队列。
- “javaboy-.*”:这个表示匹配名字以 javaboy- 开头的交换机和队列。
- “”:这个表示不匹配任何队列与交换机(如果想撤销用户的权限可以使用这个)。
我们可以使用如下命令来移除某一个用户在某一个 vhost 上的权限,例如移除 zhangsan 在 myvh 上的所有权限,如下:
rabbitmqctl clear_permissions -p myvh zhangsan
执行完成后,我们可以通过 rabbitmqctl -p myvh list_permissions 命令来查看执行结果是否生效,最终执行效果如下:

如果一个用户在多个 vhost 上都有对应的权限,按照上面的 rabbitmqctl -p myvh list_permissions 命令只能查看一个 vhost 上的权限,此时我们可以通过如下命令来查看 lisi 在所有 vhost 上的权限:
rabbitmqctl list_user_permissions lisi

🍀 Web 管理页面操作
当然,如果你不想敲命令,也可以通过 Web 管理端去操作权限。
在 Admin 选项卡,点击用户名称,就可以给用户设置权限了,如下:


可以设置权限,也可以清除权限。
当然,在网页上还有一个 Topic Permissions,这是 RabbitMQ3.7 开始的一个新功能,可以针对某一个 topic exchange 设置权限,主要针对 STOMP 或者 MQTT 协议,我们日常 Java 开发用上这个配置的机会很少。如果用户不设置的话,相应的 topic exchange 也总是有权限的。
🌚 RabbitMQ 集群搭建
单个的 RabbitMQ 肯定无法实现高可用,要想高可用,还得上集群。
🍃 两种模式
- 普通集群
- 镜像集群
🍂 普通集群
普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
大致的流程图如下图:

🍄 镜像集群
它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
大致流程图如下图:

🍁 节点类型
RabbitMQ 中的节点类型有两种:
- RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
- Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息
RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。
🌷 搭建普通集群
🌹 预备知识
大致的结构了解了,接下来我们就把集群给搭建起来。先从普通集群开始,我们就使用 docker 来搭建。
搭建之前,有两个预备知识需要大家了解:
- 搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
- RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果我们是在不同的服务器上搭建 RabbitMQ 集群,大家需要注意这一点,接下来的 2.2 小结,我们将通过 Docker 的容器连接 link 来实现容器之间的访问,略有不同)。
🥀 开始搭建
执行如下命令创建三个 RabbitMQ 容器:
docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
接下来进入到 mq02 容器中,首先查看一下 hosts 文件,可以看到我们配置的容器连接已经生效了:

将来在 mq02 容器中,就可以通过 mylink01 或者 rabbit01 访问到 mq01 容器了。
接下来我们开始集群的配置。
分别执行如下命令将 mq02 容器加入集群中:
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app

接下来输入如下命令我们可以查看集群的状态:
rabbitmqctl cluster_status
![]()
可以看到,集群中已经有两个节点了。
接下来通过相同的方式将 mq03 也加入到集群中:
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app

接下来,我们可以查看集群信息:

可以看到,此时集群中已经有三个节点了。
其实,这个时候,我们也可以通过网页来查看集群信息,在三个 RabbitMQ 实例的 Web 端首页,都可以看到如下内容:

🌺 代码测试
接下来我们来简单测试一下这个集群。
我们创建一个名为 mq_cluster_demo 的父工程,然后在其中创建两个子工程。
第一个子工程名为 provider,是一个消息生产者,创建时引入 Web 和 RabbitMQ 依赖,如下:

然后配置 applicaiton.properties,内容如下(注意集群配置):
spring.rabbitmq.addresses=localhost:5671,localhost:5672,localhost:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
接下来提供一个简单的队列,如下:
@Configuration
public class RabbitConfig {
public static final String MY_QUEUE_NAME = "my_queue_name";
public static final String MY_EXCHANGE_NAME = "my_exchange_name";
public static final String MY_ROUTING_KEY = "my_queue_name";
@Bean
Queue queue() {
return new Queue(MY_QUEUE_NAME, true, false, false);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(MY_EXCHANGE_NAME, true, false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(MY_ROUTING_KEY);
}
}
这个没啥好说的,都是基本内容,接下来我们在单元测试中进行消息发送测试:
@SpringBootTest
class ProviderApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(null, RabbitConfig.MY_QUEUE_NAME, "hello 江南一点雨");
}
}
这条消息发送成功之后,在 RabbitMQ 的 Web 管理端,我们会看到三个 RabbitMQ 实例上都会显示有一条消息,但是实际上消息本身只存在于一个 RabbitMQ 实例。
接下来我们再创建一个消息消费者,消息消费者的依赖以及配置和消息生产者都是一模一样,我就不重复了,消息消费者中增加一个消息接收器:
@Component
public class MsgReceiver {
@RabbitListener(queues = RabbitConfig.MY_QUEUE_NAME)
public void handleMsg(String msg) {
System.out.println("msg = " + msg);
}
}
当消息消费者启动成功后,这个方法中只收到一条消息,进一步验证了我们搭建的 RabbitMQ 集群是没问题的。
🌸 反向测试
确保三个 RabbitMQ 实例都是启动状态,关闭掉 Consumer,然后通过 provider 发送一条消息,发送成功之后,关闭 mq01 实例,然后启动 Consumer 实例,此时 Consumer 实例并不会消费消息,反而会报错说 mq01 实例连接不上,这个例子就可以说明消息在 mq01 上,并没有同步到另外两个 MQ 上。相反,如果 provider 发送消息成功之后,我们没有关闭 mq01 实例而是关闭了 mq02 实例,那么你就会发现消息的消费不受影响。
搭建镜像集群
所谓的镜像集群模式并不需要额外搭建,只需要我们将队列配置为镜像队列即可。
这个配置可以通过网页配置,也可以通过命令行配置,我们分别来看。
网页配置镜像队列
先来看看网页上如何配置镜像队列。
点击 Admin 选项卡,然后点击右边的 Policies,再点击 Add/update a policy,如下图:

接下来添加一个策略,如下图:

各参数含义如下:
- Name: policy 的名称。
- Pattern: queue 的匹配模式(正则表达式)。
- Definition:镜像定义,主要有三个参数:ha-mode, ha-params, ha-sync-mode。
- ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
- ha-params:ha-mode 模式需要用到的参数。
- ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
- priority 为可选参数,表示 policy 的优先级。
配置完成后,点击下面的 add/update policy 按钮,完成策略的添加,如下:

添加完成后,我们可以进行一个简单的测试。
首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息。
发完之后关闭 mq01 实例。
接下来启动 consumer,此时发现 consumer 可以完成消息的消费(注意和前面的反向测试区分),这就说明镜像队列已经搭建成功了。
命令行配置镜像队列
命令行的配置格式如下:
rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
举一个简单的配置案例:
rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
