RabbitMQ-高级
MQ高级
发送者的可靠性
发送者重连
有的时候由于网络波动,可能出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制(这个设置默认是关闭的)
1 | |
发送者确认
Spring AMQP提供了Publisher Confirm 和 Publisher Return两种机制
- 消息投递到MQ,但路由失败,Publisher Retuen返回路由异常,但返回ACK,告知投递成功
- 临时消息投递到MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到MQ,并且入队完成持久话,返 回ACK,告知投递成功
- 其他情况都返回NACK,告知投递失败
通过配置来开启发送者确认机制
1 | |
这里的publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执消息
- correlated: MQ异步回调方式返回回执消息
编写配置类为RabbitTemplate添加ReturnCallback
1 | |
编写代码添加ConfirmCallback
1 | |
编写代码添加ConfirmCallbackACK和NACK是由ConfirmCallback来返回的,如果ACK但路由失败,则由ReturnBack返回具体的原因
数据持久化
交换机持久化与队列持久化
交换机和队列在MQ中默认的设置就是持久话,即重启RabbitMQ交换机和队列不会丢失
消息持久话
在发送消息时,发送持久话消息即可做到消息持久化
SpringAMQP默认发送消息就是持久化消息
RabbitMQ非持久化策略,是被动持久化,即内存写满之后才会写入磁盘,但写入磁盘的过程中会导致IO阻塞,影响性能
而持久化策略是同步持久化,边接受消息边写入磁盘
总结:SpringAMQP是全部持久化策略,所以不需要做任何调整
Lazy Queue
在RabbitMQ的3.6.0版本开始加入Lazy Queue的概念,也就是惰性队列
- 接到信息后直接存到磁盘中,不会加载到内存
- 消费者消费消息时才会从磁盘中读取消息到内存
在3.12版本后,所有队列都是Lazy Queue模式,无法更改
消费者的可靠性
消费者确认机制
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态
注:无论哪种消息都是进行业务逻辑之后再返回的
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- Nack:处理消息失败,RabbitMQ再次投递消息给消费者,直到成功为止
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP实现了消息确认功能,并允许我们通过配置文件选择ACK处理方式,有三种方式:
- none:不处理,即消息投递后理科返回ack,消息会立刻从MQ删除,非常不安全
- manual:手动模式,需要自己在业务代码中调用API,发送ack或reject,存在业务入侵,但灵活
- auto:自动模式,SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强
- 业务正常执行:返回ack
- 业务异常:返回Nack
- 消息处理或者校验异常,返回reject
1 | |
这里的默认值就是auto
失败重试机制
SpringAMQP提供了消费者失败重试机制,消费者失败后利用本地重试,而不是请求mq
1 | |
在开启失败本地重试之后,重试次数耗尽消息仍然失败,则需要有MessageRecoverer接口来处理,他包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(之前的默认策略)
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到制定的交换机,将执行错误的消息最后通过邮件的方式发送到开发者手中
1 | |
业务幂等性
同一个业务执行一次和多次的结果是一样的,那么他就是幂等的
更改订单状态,就是幂等业务
但是用户下单扣减库存,就是非幂等业务
唯一消息id
- 每条消息都生成一个唯一的消息id,与消息一起投递给消费者
- 消费者接受到消息后处理业务,处理成功后将消息id保存到数据库
- 下次收到消息,先去数据库查询判断是否存在,存在则为重复消息放弃处理
在配置底层使用jackson消息转换器时,配置成自动生成id
1 | |
1 | |
业务判断
并不是所有业务都适配使用业务判断的方式来实现保证业务的x-dead-letter-exchange:指定一个交换机,当消息过期成为死信幂等性
比如修改订单状态到已支付,这个业务在修改之前,查询订单状态是否为未支付,只有为未支付才改变订单状态
这样就可以通过业务判断的模式将非幂等业务修改为幂等业务
延迟消息
死信交换机
当一个队列中的消息满足下列情况之一,就会成为死信(dead letter)
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false(失败消息不回队列)
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间)超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
死信交换机的设计本意是用以记录失败的消息,可以通过死信队列来设计“死信消费者”
死信交换机和死信队列可以作为实现延迟消息的一种变通方案,但它存在明显的队头阻塞和管理复杂性问题。在支持原生延迟消息功能的MQ系统(如RabbitMQ安装了 rabbitmq_delayed_message_exchange 插件)中,强烈推荐使用原生功能,因为它更高效、更灵活、更易于管理
延迟消息插件
下载,安装DelayExchange插件,在声明交换机时将delay属性设置为true
发布者在发布消息时设置delay属性,即可实现消息的延迟发送
消息会被在交换机延时,到时间再发送给指定队列
MQ的延迟消息会使用cpu来计算时钟,如果同一时间有大量的延迟消息,cpu压力会很大
取消订单
未支付订单的自动取消是MQ延迟消息最经典、最常见的应用场景之一
它解决了定时任务取消订单不够即时,释放商品库存不够即时的痛点问题
用户下单之后就发送延迟消息,在15分钟后检查订单支付状态:
- 已支付:更新订单状态为已支付
- 未支付:更新订单状态为关闭订单,并且将商品库存恢复
查询支付状态时,要查询订单状态是否为已支付,也要查询订单状态是否为已支付(防止订单服务改变而支付状态未变)