微服务-消息队列

消息队列

同步与异步

同步调用有很强的时效性,且业务中百分之八十的业务都会使用同步调用方式

但同步调用对于性能方面也有很大缺点,且耦合性强

我们可以将一些边缘业务的服务调用放到消息队列中,异步调用其他服务


MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看是存放消息的队列,也就是异步调用中的Broker

RabbitMQ:对吞吐量要求不是极致高,但对消息路由和可靠性要求严格的场景。

RocketMQ:对吞吐量、可用性、消息特性 (如事务消息、顺序消息、定时消息) 有较高要求的场景。

Kafka:对吞吐量要求不是极致高,但对消息路由和可靠性要求严格的场景。

由阿里云开发的RocketMQ并不支持多语言的微服务之间通讯,而RabbitMQ支持

Kafka是Apache公司开发的针对于大数据和高吞吐量的中间件


RabbitMQ

RabbitMQ的整体架构:

  • virtual-host:虚拟主机,类似于命名空间,当有多个项目共用一个MQ时使用
  • publisher:消息发送者
  • consumer:消息消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

消息发送者把消息发送到交换机,但交换机本身不存储消息,需要配置对应路由规则,将消息发送到指定的队列


数据隔离

为新项目添加新用户,在新用户下添加新的虚拟主机


JAVA客户端

在这里我们使用SpringAMQP客户端,RabbitMQ支持此协议通讯

基于RabbitMQ的API,Spring提供了spring-rabbit的具体实现

导入依赖

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: hmall
password: 123
virtual-host: /hmall

通过RabbitTemplate简单向队列直接发送消息

1
2
3
4
5
6
@Autowired // 使用 @Autowired 注解在字段上
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp");
}

编写Rabbit的监听器,让队列触发该消息

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
log.info("消费者接收到的消息是:{}", msg);
}
}

06-25 11:19:07:597 INFO 229826 — [ntContainer#0-1] c.i.consumer.mq.SpringRabbitListener : 消费者接收到的消息是:hello,spring amqp


Work Queues

让多个消费者绑定同一个队列,共同消费队列中的消息

同一条消息,被一个消费者处理后,这条消息就被删除了,不能被其他消费者处理

对于大量消息,我们通过Work Queues的特性,创建多个消费者消费同一个队列,这样能做到负载均衡

Work Queues的默认策略是根据消费者的数量平均分配消息数量(不论消费者的处理性能)

修改配置文件preFetch值为1,确保同一时刻最多投递给消费者1条消息

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1

这样就不是消费者平均分配任务,而是“能者多劳”


交换机exchange

Fanout广播交换机

就算是把多个消费者绑定同一个队列,这个队列中的消息也不会同时被多个消费者消费

但通过广播交换机,我们可以实现多个消费者共同消费一条消息

1
2
3
4
5
6
7
8
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String exchange = "hmall.fanout";
// 不同于给队列直接发消息,给交换机发消息是有三个参数
rabbitTemplate.convertAndSend(exchange,null,"hello,spring amqp");
}

Direct交换机

每一个Queue都设置一个BingdingKey

发布者发送消息时,指定消息的RoutingKey

在这里,BingdingKey是可以重复的,重复的BindingKey会以Fanout相同的方式复制消息分别发送到对列,形成广播

Topic交换机

在发送消息到交换机时,可以使用通配符

#:指代0个或多个单词

*:指代一个单词


队列和交换机的声明

SpringAMQP提供了几个类,用来声明队列,交换机和绑定关系

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BingdingBuilder构建

在SpringBoot项目中,我们可以通过编写配置类的方式,在项目初始化时完成交换机和队列的创建及绑定

通常情况下,在消费者一端去创建和绑定RabbitMQ

SpringAMQP还提供了注解的方式来实现

1
2
3
4
5
6
7
8
9
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息: ["+msg+"]");
}

将队列绑定到消费者的同时,初始化队列和交换机,指定交换机类型并将其绑定


消息转换器

Spring在对消息对象的处理时默认实现是SimpleMessageConverter,是基于JDK的ObjectOutputStream来实现的,但这样做会存在很多问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化来替代默认的JDK序列化

导入依赖

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

在发布者和消费者都添加配置类

1
2
3
4
5
6
7
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

这样就实现了使用jackson的Json转换来替换默认的JDK字节流转换



微服务-消息队列
http://blog.170827.xyz/2025/06/24/微服务-消息队列/
作者
XIAOBAI
发布于
2025年6月24日
许可协议