微服务-消息队列
消息队列
同步与异步
同步调用有很强的时效性,且业务中百分之八十的业务都会使用同步调用方式
但同步调用对于性能方面也有很大缺点,且耦合性强
我们可以将一些边缘业务的服务调用放到消息队列中,异步调用其他服务
MQ技术选型
MQ(MessageQueue),中文是消息队列,字面来看是存放消息的队列,也就是异步调用中的Broker
RabbitMQ:对吞吐量要求不是极致高,但对消息路由和可靠性要求严格的场景。
RocketMQ:对吞吐量、可用性、消息特性 (如事务消息、顺序消息、定时消息) 有较高要求的场景。
Kafka:对吞吐量要求不是极致高,但对消息路由和可靠性要求严格的场景。
由阿里云开发的RocketMQ并不支持多语言的微服务之间通讯,而RabbitMQ支持
Kafka是Apache公司开发的针对于大数据和高吞吐量的中间件
RabbitMQ
RabbitMQ的整体架构:
- virtual-host:虚拟主机,类似于命名空间,当有多个项目共用一个MQ时使用
- publisher:消息发送者
- consumer:消息消费者
- queue:队列,存储消息
- exchange:交换机,负责路由消息
消息发送者把消息发送到交换机,但交换机本身不存储消息,需要配置对应路由规则,将消息发送到指定的队列
数据隔离
为新项目添加新用户,在新用户下添加新的虚拟主机
JAVA客户端
在这里我们使用SpringAMQP客户端,RabbitMQ支持此协议通讯
基于RabbitMQ的API,Spring提供了spring-rabbit的具体实现
导入依赖
1 | |
配置文件
1 | |
通过RabbitTemplate简单向队列直接发送消息
1 | |
编写Rabbit的监听器,让队列触发该消息
1 | |
06-25 11:19:07:597 INFO 229826 — [ntContainer#0-1] c.i.consumer.mq.SpringRabbitListener : 消费者接收到的消息是:hello,spring amqp
Work Queues
让多个消费者绑定同一个队列,共同消费队列中的消息
同一条消息,被一个消费者处理后,这条消息就被删除了,不能被其他消费者处理
对于大量消息,我们通过Work Queues的特性,创建多个消费者消费同一个队列,这样能做到负载均衡
Work Queues的默认策略是根据消费者的数量平均分配消息数量(不论消费者的处理性能)
修改配置文件preFetch值为1,确保同一时刻最多投递给消费者1条消息
1 | |
这样就不是消费者平均分配任务,而是“能者多劳”
交换机exchange
Fanout广播交换机
就算是把多个消费者绑定同一个队列,这个队列中的消息也不会同时被多个消费者消费
但通过广播交换机,我们可以实现多个消费者共同消费一条消息
1 | |
Direct交换机
每一个Queue都设置一个BingdingKey
发布者发送消息时,指定消息的RoutingKey
在这里,BingdingKey是可以重复的,重复的BindingKey会以Fanout相同的方式复制消息分别发送到对列,形成广播
Topic交换机
在发送消息到交换机时,可以使用通配符
#:指代0个或多个单词
*:指代一个单词
队列和交换机的声明
SpringAMQP提供了几个类,用来声明队列,交换机和绑定关系
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BingdingBuilder构建
在SpringBoot项目中,我们可以通过编写配置类的方式,在项目初始化时完成交换机和队列的创建及绑定
通常情况下,在消费者一端去创建和绑定RabbitMQ
SpringAMQP还提供了注解的方式来实现
1 | |
将队列绑定到消费者的同时,初始化队列和交换机,指定交换机类型并将其绑定
消息转换器
Spring在对消息对象的处理时默认实现是SimpleMessageConverter,是基于JDK的ObjectOutputStream来实现的,但这样做会存在很多问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
建议采用JSON序列化来替代默认的JDK序列化
导入依赖
1 | |
在发布者和消费者都添加配置类
1 | |
这样就实现了使用jackson的Json转换来替换默认的JDK字节流转换