RabbitMQ-高级

MQ高级

发送者的可靠性

发送者重连

有的时候由于网络波动,可能出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制(这个设置默认是关闭的)

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数, 下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

发送者确认

Spring AMQP提供了Publisher Confirm 和 Publisher Return两种机制

  • 消息投递到MQ,但路由失败,Publisher Retuen返回路由异常,但返回ACK,告知投递成功
    • 临时消息投递到MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到MQ,并且入队完成持久话,返 回ACK,告知投递成功
  • 其他情况都返回NACK,告知投递失败

通过配置来开启发送者确认机制

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true

这里的publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执消息
  • correlated: MQ异步回调方式返回回执消息

编写配置类为RabbitTemplate添加ReturnCallback

1
2
3
4
5
6
7
8
9
10
11
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("触发return callback");
log.debug("exchange:{}",returnedMessage.getExchange());
log.debug("routingKey:{}",returnedMessage.getRoutingKey());
log.debug("message:{}",returnedMessage.getMessage());
log.debug("replyCode:{}",returnedMessage.getReplyCode());
log.debug("replyText:{}",returnedMessage.getReplyText());
});编写代码添加ConfirmCallback5
}

编写代码添加ConfirmCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
void testPublisherConfirm() throws InterruptedException {
// 1. 创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2. 给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回调的处理逻辑,参数中的result就是回调内容
if (result.isAck()) { // result.isAck(), boolean类型, true代表ack回调, false 代表 nack回调
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(), String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3. 发送消息
rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}

编写代码添加ConfirmCallbackACK和NACK是由ConfirmCallback来返回的,如果ACK但路由失败,则由ReturnBack返回具体的原因


数据持久化

交换机持久化与队列持久化

交换机和队列在MQ中默认的设置就是持久话,即重启RabbitMQ交换机和队列不会丢失

消息持久话

在发送消息时,发送持久话消息即可做到消息持久化

SpringAMQP默认发送消息就是持久化消息

RabbitMQ非持久化策略,是被动持久化,即内存写满之后才会写入磁盘,但写入磁盘的过程中会导致IO阻塞,影响性能

而持久化策略是同步持久化,边接受消息边写入磁盘

总结:SpringAMQP是全部持久化策略,所以不需要做任何调整


Lazy Queue

在RabbitMQ的3.6.0版本开始加入Lazy Queue的概念,也就是惰性队列

  • 接到信息后直接存到磁盘中,不会加载到内存
  • 消费者消费消息时才会从磁盘中读取消息到内存

在3.12版本后,所有队列都是Lazy Queue模式,无法更改


消费者的可靠性

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态

注:无论哪种消息都是进行业务逻辑之后再返回的

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • Nack:处理消息失败,RabbitMQ再次投递消息给消费者,直到成功为止
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP实现了消息确认功能,并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理,即消息投递后理科返回ack,消息会立刻从MQ删除,非常不安全
  • manual:手动模式,需要自己在业务代码中调用API,发送ack或reject,存在业务入侵,但灵活
  • auto:自动模式,SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强
    • 业务正常执行:返回ack
    • 业务异常:返回Nack
    • 消息处理或者校验异常,返回reject
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto

这里的默认值就是auto


失败重试机制

SpringAMQP提供了消费者失败重试机制,消费者失败后利用本地重试,而不是请求mq

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 是否开启重试
initial-interval: 5000 # 重试间隔
multiplier: 1 # 重试间隔倍数
max-attempts: 3 # 最大重试次数
stateless: true # 是否无状态,有事务关联改成false,无事务关联使用true

在开启失败本地重试之后,重试次数耗尽消息仍然失败,则需要有MessageRecoverer接口来处理,他包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(之前的默认策略)
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到制定的交换机,将执行错误的消息最后通过邮件的方式发送到开发者手中
1
2
3
4
5
6
private final RabbitTemplate rabbitTemplate;
@Bean
public MessageRecoverer messageRecoverer(){
// rabbitTemplate ,交换机名字,BindingKey
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

业务幂等性

同一个业务执行一次和多次的结果是一样的,那么他就是幂等的

更改订单状态,就是幂等业务

但是用户下单扣减库存,就是非幂等业务

唯一消息id

  • 每条消息都生成一个唯一的消息id,与消息一起投递给消费者
  • 消费者接受到消息后处理业务,处理成功后将消息id保存到数据库
  • 下次收到消息,先去数据库查询判断是否存在,存在则为重复消息放弃处理

在配置底层使用jackson消息转换器时,配置成自动生成id

1
2
3
4
5
6
7
@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// 开启消息头id生成
converter.setCreateMessageIds(true);
return converter;
}
1
2
3
4
5
6
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(Message msg) {
// 拿到消息头id,做校验,保证业务幂等性
String messageId = msg.getMessageProperties().getMessageId();
log.info("消费者接收到的消息是:{}", msg.getBody().toString());
}

业务判断

并不是所有业务都适配使用业务判断的方式来实现保证业务的x-dead-letter-exchange:指定一个交换机,当消息过期成为死信幂等性

比如修改订单状态到已支付,这个业务在修改之前,查询订单状态是否为未支付,只有为未支付才改变订单状态

这样就可以通过业务判断的模式将非幂等业务修改为幂等业务


延迟消息

死信交换机

当一个队列中的消息满足下列情况之一,就会成为死信(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false(失败消息不回队列)
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间)超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

死信交换机的设计本意是用以记录失败的消息,可以通过死信队列来设计“死信消费者”

死信交换机和死信队列可以作为实现延迟消息的一种变通方案,但它存在明显的队头阻塞管理复杂性问题。在支持原生延迟消息功能的MQ系统(如RabbitMQ安装了 rabbitmq_delayed_message_exchange 插件)中,强烈推荐使用原生功能,因为它更高效、更灵活、更易于管理


延迟消息插件

下载,安装DelayExchange插件,在声明交换机时将delay属性设置为true

发布者在发布消息时设置delay属性,即可实现消息的延迟发送

消息会被在交换机延时,到时间再发送给指定队列

MQ的延迟消息会使用cpu来计算时钟,如果同一时间有大量的延迟消息,cpu压力会很大


取消订单

未支付订单的自动取消是MQ延迟消息最经典、最常见的应用场景之一

它解决了定时任务取消订单不够即时,释放商品库存不够即时的痛点问题

用户下单之后就发送延迟消息,在15分钟后检查订单支付状态:

  • 已支付:更新订单状态为已支付
  • 未支付:更新订单状态为关闭订单,并且将商品库存恢复

查询支付状态时,要查询订单状态是否为已支付,也要查询订单状态是否为已支付(防止订单服务改变而支付状态未变)



RabbitMQ-高级
http://blog.170827.xyz/2025/06/26/RabbitMQ-高级/
作者
XIAOBAI
发布于
2025年6月26日
许可协议