Mushroom Notes Mushroom Notes
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档

kinoko

一位兴趣使然的热心码农
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档
  • Spring

  • SpringCloud

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件

      • RabbitMQ
        • 同步通讯
        • 异步通讯
        • 技术对比
        • 安装与部署
        • RabbitMQ的基本结构
        • RabbitMQ的消息模型
        • 图形化页面配置
          • 用户管理
          • 虚拟机
          • 交换机
          • 队列
        • SpringAmqp
          • 快速开始
          • 注解声明交换机队列
          • WorkQueue 工作队列
          • 交换机
          • Fanout 广播
          • Direct 路由
          • Topic 主题
          • 消费预取问题
        • 消息转换器
        • 生产者可靠性
          • 生产者重试机制
          • 生产者确认机制
          • 生产者确认机制开启
          • 定义 Return CallBack
          • 定义 ConfirmCallback
          • 效率影响测试
        • MQ消息可靠性
          • 数据持久化
          • LazyQueue
          • 控制台配置Lazy模式
          • 注解声明Lazy模式
        • 消费者的可靠性
          • 消费者确认机制
          • 消费失败处理
          • 失败处理策略
          • 兜底方案
        • 消息幂等性
          • 唯一消息ID
          • 业务判断
        • 延迟消息
          • 死信交换机
          • 延迟交换机
          • 插件安装
          • 1、插件下载
          • 2、Docker 安装
          • 声明延迟交换机
          • 发送延迟消息
          • 取消超时订单问题
          • 解决方案
          • 生产者优化
          • 消费者优化
        • 最佳实践
          • 全局常量
          • 抽取mq工具
      • Kafka
  • 数据库

  • 通信

  • 框架
  • SpringCloud
  • 消息中间件
kinoko
2024-03-02
目录

RabbitMQ

# 同步通讯

image.png
Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

  1. 耦合度高

每次加入新的需求们都需要修改业务代码

  1. 性能和吞吐能力下降

调用者需要等待服务提供者相应,如果调用链过长则响应时间之和等于每次调用的时间

  1. 有额外的资源消耗

调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

  1. 有级联失败问题

如果服务提供者出现问题,所有调用方都会跟着出现问题,如同多米诺骨牌一样,迅速导致整个微服务集群故障

# 异步通讯


image.png
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
image.png
好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速
  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

# 技术对比



RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 高 一般 高 高
单机吞吐量 一般 差 高 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 高 一般 高 一般

特性场景:
追求高可用性:电商系统中支付服务等业务场景。
追求单机吞吐量:日志采集或大数据等场景。
消息延时:机房温控等工业业务场景。
消息可靠性:银行支付系统等业务场景。

总结:

  1. 中小型公司首选RabbitMQ:管理界面简单,高并发
  2. 大型公司可以选择RocketMQ:更高并发,可对rocketmq进行定制化开发
  3. 日志采集功能:首选kafka,专为大数据准备

# 安装与部署

官网:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ (opens new window)
1、拉取容器

docker pull rabbitmq:3.12-management
1

2、启动容器

docker run \
-e RABBITMQ_DEFAULT_USER=rmq \
-e RABBITMQ_DEFAULT_PASS=rmq \
-v mq-plugins:/plugins \
--name rmq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.12-management
1
2
3
4
5
6
7
8
9
  • -e RABBITMQ_DEFAULT_USER=rmq: 设置RabbitMQ的默认用户名为"rmq"。
  • -e RABBITMQ_DEFAULT_PASS=rmq: 设置RabbitMQ的默认密码为"rmq"。
  • -v : 挂载卷。
  • --name mq: 设置容器的名称为"mq"。
  • --hostname mq: 设置容器的主机名为"mq"。
  • -p 15672:15672: RabbitMQ管理界面的端口。
  • -p 5672:5672: RabbitMQ的AMQP协议端口5672。
  • -d rabbitmq:management: 指定了要运行的Docker镜像,并在后台以守护进程的方式运行。

3、访问图形化界面
启动镜像后访问:http://{IP}:15672 (opens new window)

# RabbitMQ的基本结构


image.png
RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

# RabbitMQ的消息模型


RabbitMQ官方提供了5个消息模型:

  • 基本消息队列模型(BasicQueue)
  • 工作消息队列模型(WorkQueue)
  • 发布订阅模型(Publish、Subscribe),根据交换机类型分为三种:
    • Fanout Exchange:广播
    • Direct Exchange: 路由
    • Topic Exchange:主题

# 图形化页面配置


一般来说,我们使用会为每一个服务创建自己的virtualHost以达到消息隔离的效果,并且每个服务对应独立一个账号,管理自己的virtualHost,所以首先先创建一个账号。

# 用户管理

点击admin选项卡选择users创建新用户:
image.png
创建完毕后可以看到列表多出来一个,但是没有分配虚拟机,所以我们接下登录用户来创建虚拟机。
image.png

# 虚拟机

点击admin选型卡选择Virtual Hosts创建虚拟机:
image.png
添加后就可以看到列表多了一个
image.png
回去Users查看可以看到自动绑定上了:
image.png

# 交换机

点击exchanges选项卡,可以查看现有交换机,创建新虚拟机:
image.png

# 队列

我们打开Queues and Streams选项卡,新建一个队列:
image.png
然后就可以看到列表中出现新建的队列了
image.png
绑定交换机
先点击demo1.queue队列,然后选择bindings
image.png
再往下也可以测试队列收发消息能力:
image.png

# SpringAmqp


SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp (opens new window)
image.png image.png
SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

# 快速开始


1、导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5

2、配置MQ地址

spring:
  rabbitmq:
    host: 192.168.95.120 # 虚拟机IP
    port: 5672 # 端口
    virtual-host: demo1VH # 虚拟主机
    username: kinoko # 用户名
    password: 123 # 密码
1
2
3
4
5
6
7

3、开始使用...

# 注解声明交换机队列


使用交换机后队列与交换机的关系可能会很复杂,而且在图形化界面手动创建过于繁琐,一般会采用注解声明的方式自动创建队列和交换机,减轻运维的压力。
注解:**@QueueBinding**

@RabbitListener(bindings = @QueueBinding( // 配置绑定信息
	value = @Queue(name = "topic.queue", durable = "true"), // 将消费者绑定队列,开启持久化
	exchange = @Exchange(name = "kk.topic", type = ExchangeTypes.TOPIC), // 队列绑定交换机
	key = {"#"} // 设置匹配key
))
public void TopicExchange(String message) {
	System.out.println("Topic消费者消费信息--" + message);
}
1
2
3
4
5
6
7
8

# WorkQueue 工作队列


yuque_mind.jpeg
生产者

@Test
public void send() {
    for (int i = 1; i <= 10; i++) {
        rabbitTemplate.convertAndSend("demo1.queue", "msg_" + i);
    }
}
1
2
3
4
5
6

消费者

@RabbitListener(queues = "demo1.queue")
public void workListener1(String msg) {
    logger.info("work消费者1监听消息:{}", msg);
}

@RabbitListener(queues = "demo1.queue")
public void workListener2(String msg) {
    logger.error("work消费者2监听消息:{}", msg);
}
1
2
3
4
5
6
7
8
9

# 交换机


yuque_mind (1).jpeg
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

# Fanout 广播

image.png

  1. 可以有多个队列
  2. 每个队列都要绑定到Exchange(交换机)
  3. 生产者发送的消息,只能发送到交换机
  4. 交换机把消息发送给绑定过的所有队列
  5. 订阅队列的消费者都能拿到消息

实现
1、首先创建广播交换机
image.png
2、创建两个队列绑定交换机
image.png
生产者

@Test
public void send() {
    rabbitTemplate.convertAndSend("fanout.exchange","", "msg");
}
1
2
3
4

消费者

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue1"),
    exchange = @Exchange(name = "fanout.exchange", type = ExchangeTypes.FANOUT),
    key = {"#"}
))
public void FanoutExchange1(String message) {
logger.info("Fanout消费者[1]消费消息--" + message);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.queue2"),
    exchange = @Exchange(name = "fanout.exchange", type = ExchangeTypes.FANOUT),
    key = {"#"}
))
public void FanoutExchange2(String message) {
logger.error("Fanout消费者[2]消费消息--" + message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

效果

Fanout消费者[1]消费消息--msg
Fanout消费者[2]消费消息--msg
1
2

# Direct 路由

image.png
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

实现
1、交换机
image.png
2、队列
image.png
生产者

@Test
public void send() {
    rabbitTemplate.convertAndSend("direct.exchange","red", "red");
    rabbitTemplate.convertAndSend("direct.exchange","blue", "blue");
    rabbitTemplate.convertAndSend("direct.exchange","yellow", "yellow");
}
1
2
3
4
5
6

消费者

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.TOPIC),
    key = {"yellow","red"}
))
public void DirectExchange1(String message) {
System.out.println("Direct消费者[1]消费消息--" + message);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.TOPIC),
    key = {"blue","red"}
))
public void DirectExchange2(String message) {
System.out.println("Direct消费者[2]消费消息--" + message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

效果

Direct消费者[1]消费消息--yellow
Direct消费者[1]消费消息--red
Direct消费者[2]消费消息--red
Direct消费者[2]消费消息--blue
1
2
3
4

# Topic 主题

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

如图:
image.png
假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

实现:
1、交换机
image.png
2、队列
image.png
生产者

@Test
public void send() {
    rabbitTemplate.convertAndSend("topic.exchange","china.news", "和平安康");
    rabbitTemplate.convertAndSend("topic.exchange","japan.news", "犯罪事件");
    rabbitTemplate.convertAndSend("topic.exchange","china.weather", "春天");
    rabbitTemplate.convertAndSend("topic.exchange","japan.weather", "夏天");
}
1
2
3
4
5
6
7

消费者

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = {"china.#"}
))
public void TopicExchange1(String message) {
    logger.info("Topic消费者1消费信息--" + message);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = {"#.news"}
))
public void TopicExchange2(String message) {
    logger.error("Topic消费者2消费信息--" + message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

效果

Topic消费者2消费信息--犯罪事件
Topic消费者2消费信息--和平安康
Topic消费者1消费信息--和平安康
Topic消费者1消费信息--春天
1
2
3
4

# 消费预取问题

工作队列中消费者消费消息存在一个预取机制,假设队列里有50个消息,有两个消费者绑定了这个队列,那么就会从队列中预取消息,消费者a预取奇数消息,消费者b预取偶数消息,也就是各取一半,并且占有预取的消息。这样会引发一个问题,在消费者之间存在性能差异时,会出现一方消费完预取消息后闲置,导致性能浪费。

解决方法是限制消费预取,在application.yml文件设置prefetch值

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能预取一条消息,处理完成才能获取下一个消息
1
2
3
4
5

# 消息转换器


Spring默认会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

注意

对象类需要实现Serializable接口并且是public

默认情况下Spring采用的序列化方式是JDK序列化。存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

引入依赖

<dependency>
	<groupId>com.fasterxml.jackson.dataformat</groupId>
	<artifactId>jackson-dataformat-xml</artifactId>
	<version>2.9.10</version>
</dependency>
1
2
3
4
5

注意

如果项目中引入了spring-boot-starter-web依赖,则无需引入。

在生产者和消费者端的启动类中配置JSON序列化

import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

@Configuration
public class MqMessageConverter implements RabbitListenerConfigurer {

    /**
     * 设置消息发送格式转换器
     * @return 消息转换器
     */
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 设置默认消息处理方法工厂
     * @return 默认消息处理方法工厂
     */
    @Bean
    public DefaultMessageHandlerMethodFactory defaultMessageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        // 设置转换器
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    /**
     * 设置默认消息处理方法工厂
     * @param rabbitListenerEndpointRegistrar 注册器
     */
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(defaultMessageHandlerMethodFactory());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

注意

传输实体一定要实现序列化和无参构造,否则无法实例化

如果报出:

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class cn.kk.MqPublisherTest$User and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1308)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:53)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:30)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:479)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:318)
	at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4719)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3987)
	at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.createMessage(AbstractJackson2MessageConverter.java:453)
	... 9 more
1
2
3
4
5
6
7
8
9
10
11
12

则是实体类没实现序列化接口或者是没实现getset方法。

生产者

@Test
public void send() {
	rabbitTemplate.convertAndSend("demo1.queue", new User("张三", 18, "13812345678"));
}
1
2
3
4

消息
image.png
消费者只能使用map来接收

@RabbitListener(queues = "demo1.queue")
public void workListener1(Map<String, Object> msg) {
	logger.info("work消费者1监听消息:{}", msg);
}
1
2
3
4

打印

work消费者1监听消息:{name=张三, age=18, phone=13812345678}
1

# 生产者可靠性

# 生产者重试机制


SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
修改publisher模块的application.yaml文件,添加下面的内容:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
1
2
3
4
5
6
7
8
9

注意

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

# 生产者确认机制


一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执:
image.png

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ACK的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制,并且默认都是关闭的,需要在配置文件开启。

# 生产者确认机制开启

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制,一般不开启
1
2
3
4

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

# 定义 Return CallBack

publisher return是在路由失败的情况会出现,是由开发人员保证的,所以一般都不会开启。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig implements ApplicationContextAware {
    
    Logger log = LoggerFactory.getLogger(MqConfig.class);

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("触发return callback,");
            log.debug("交换机: {}", returned.getExchange());
            log.debug("路由key: {}", returned.getRoutingKey());
            log.debug("消息: {}", returned.getMessage());
            log.debug("响应码: {}", returned.getReplyCode());
            log.debug("原因: {}", returned.getReplyText());
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 定义 ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
image.png
这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

根据依赖版本不同可能移除了addCallback()方法,而是使用CompletableFuture实现则可以如下编写:

@Test
public void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().whenComplete((result, ex) -> {
        // 2.1.Future发生异常时的处理逻辑,基本不会触发
        if (ex != null) {
            log.error("send message fail", ex);
        }
        // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
        if (result.isAck()) { // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
            log.info("发送消息成功,收到 ack!");
        } else { // result.getReason(),String类型,返回nack时的异常描述
            log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("demo1.queue", "q", "hello", cd);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

测试1:发送至不存在的交换机

发送消息失败,收到 nack, reason : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'demo1.queue' in vhost 'demo1VH', class-id=60, method-id=40)
1

测试2:发生至不存在的routingKey

触发return callback,
发送消息成功,收到 ack!
交换机: topic.exchange
路由key: q
消息: (Body:'"hello"' MessageProperties [headers={spring_returned_message_correlation=5a1e8360-1d5c-4360-abc5-7a167b3efad7, __TypeId__=java.lang.String}, messageId=d252e341-ab7d-43e5-bd89-8a5971097979, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
响应码: 312
原因: NO_ROUTE
1
2
3
4
5
6
7

注意

注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

# 效率影响测试

我们可以测试一下生产者确认机制对效率的影响,因为spring默认会将消息设置成持久化消息,所以我们手动将消息设置为非持久化,方便清理:

@Test
public void test() {
    Message msg = MessageBuilder
    .withBody("hello".getBytes(StandardCharsets.UTF_8))
    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
    .build();
    // 100w条消息
    for (int i = 0; i < 1_000_000; i++) {
        rabbitTemplate.convertAndSend("demo1.queue", msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11

当前队列情况:

image.png

开启生产者确认机制的情况:

image.png
目前消息还没发送完毕,存在波动是因为消息堆积导致队列正在进行Paged Out,但是可以看出峰值就在6k左右,rabbitmq的吞吐量应该是万级,这明显差远了。

关闭生产者确认的情况:

image.png
可见效率一下就上去了,所以为什么不建议开启生产者确认机制就是因为确实对效率的影响很大,无论是对mq的吞吐量还是对业务接口的影响。

# MQ消息可靠性

# 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化和队列持久化都可以通过新增时选择持久化选项:
image.png
注意:注解声明的队列和交换机默认都是持久化的。

消息持久化需要在发送消息时附加一个参数:
image.png

注意

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

# LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式,并且无法更改。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

# 控制台配置Lazy模式

3.12版本之后默认是开启的,如果旧版本需要加上x-queue-mod=lazy参数
image.png

# 注解声明Lazy模式

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}
1
2
3
4
5
6
7
8

# 消费者的可靠性

# 消费者确认机制

消费者确认机制为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;

注意

返回**reject**常见异常:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

  • o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
  • o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
  • o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) ) is used in the listener and the validation fails.
  • o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message but Message is received.
  • java.lang.NoSuchMethodException: Added in version 1.6.3.
  • java.lang.ClassCastException: Added in version 1.6.3.

配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto,自动ack;
1
2
3
4
5

# 消费失败处理

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。
image.png
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
1
2
3
4
5
6
7
8
9
10

# 失败处理策略

本地重试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了,因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
1
2
3
4
5
6
7
8
9
10
11
12

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
1
2
3
4

完整代码如下:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 兜底方案

尽管rabbitMQ已经提供了很多机制以保证消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?以订单支付与订单状态变更解耦的场景,有没有其它兜底方案,能够确保订单的支付状态一致呢?

思想
既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。流程如下:
yuque_mind (2).jpeg
图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。那么问题来了,我们到底该在什么时间主动查询支付状态呢?
这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

# 消息幂等性


在程序开发中,幂等指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

# 唯一消息ID

思路很简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可,以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
1
2
3
4
5
6
7
8

这样默认生成的是UUID,如果想要自己设置ID生成方式,也可以通过如下方式:
生产者

@Test
public void send() {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(UUID.randomUUID().toString());
    messageProperties.setContentType("text/plain");
    messageProperties.setContentEncoding("utf-8");
    Message message = new Message("msg".getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("demo1.queue", message);
}
1
2
3
4
5
6
7
8
9

消费者

@RabbitListener(queues = "demo1.queue")
public void workListener2(Message msg) {
    logger.error("work消费者2监听消息:");
    logger.error("消息id:{}", msg.getMessageProperties().getMessageId());
    logger.error("消息内容:{}", new String(msg.getBody(), StandardCharsets.UTF_8));
}
1
2
3
4
5
6

打印

work消费者2监听消息:
消息id:28c2fa93-e208-4765-8049-a21c97ba332b
消息内容:msg
1
2
3

但一般不会使用这种方式,这样业务侵入性高,而且我们消息队列都是按照业务划分的,消息本身就应该带有业务ID,所以通常情况下我们都是在业务层面进行幂等保证。

# 业务判断

  • 新增:一般会由我们生成一个唯一ID进行判断库中是否存在,并且设置唯一键做数据库兜底
  • 删除:天然幂等,无需额外处理
  • 修改:因业务而异,一般是通过判断状态是否变更或使用乐观锁来保证幂等性

以支付修改订单的业务为例,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理:

@Override
public void markOrderPaySuccess(Long orderId) {
    // 1.查询订单
    Order old = getById(orderId);
    // 2.判断订单状态
    if (old == null || old.getStatus() != 1) {
        // 订单不存在或者订单状态不是1,放弃处理
        return;
    }
    // 3.尝试更新订单
    Order order = new Order();
    order.setId(orderId);
    order.setStatus(2);
    order.setPayTime(LocalDateTime.now());
    updateById(order);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

采用乐观锁可以优化并且预防并发安全问题:

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}
1
2
3
4
5
6
7
8
9
10

# 延迟消息


在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

# 死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

image.png

@Configuration
public class RabbitConfig {

    /**
     * 创建下单消息交换机
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE, true, false);
    }


    /**
     * 订单阻塞队列
     */
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
                .ttl(30000) // 代表该队列的消息过期时间 ms
                .deadLetterExchange(DL_EXCHANGE) // 转发到死信交换机
                .deadLetterRoutingKey(DL_KEY) // 死信交换key
                .build();
    }

    /**
     * 绑定订单交换机与订单队列
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_KEY);
    }

    /**
     * 创建死信消息交换机
     */
    @Bean
    public DirectExchange DLExchange() {
        return new DirectExchange(DL_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     */
    @Bean
    public Queue DLQueue() {
        return QueueBuilder.durable(DL_QUEUE).build();
    }

    /**
     * 绑定死信交换机与死信队列
     */
    @Bean
    public Binding DLBinding(){
        return BindingBuilder.bind(DLQueue()).to(DLExchange()).with(DL_KEY);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

生产者

@Test
public void test() {
    // 将订单ID存入订单队列
    rabbitTemplate.convertAndSend(MQ.ORDER_EXCHANGE, MQ.ORDER_KEY, orderId);
}
1
2
3
4
5

也可以手动指定过期时间

@Test
public void test() {
    Message msg = MessageBuilder
    .withBody("hello".getBytes(StandardCharsets.UTF_8))
    .setExpiration("30000")
    .build();
    rabbitTemplate.convertAndSend("demo1.queue", msg);
}
1
2
3
4
5
6
7
8

或者使用MessagePostProcessor

@Test
public void test() {
    rabbitTemplate.convertAndSend("demo1.queue", "hello",
                message -> {
                    message.getMessageProperties().setExpiration("3000");
                    return message;
                });
}
1
2
3
4
5
6
7
8

死信队列消费者

@Component
@Slf4j
public class DLLisenter {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderDetailMapper orderDetailMapper;

    @Autowired
    private OrderLogisticsMapper orderLogisticsMapper;

    @Autowired
    private ItemClient itemClient;

    @RabbitListener(queues = MQ.DL_QUEUE)
    @GlobalTransactional
    public void handleOrderCancel(Long orderId) {

        Order order = orderMapper.selectById(orderId);

        // 判断订单是否超时
        if (order.getStatus() == 1) {
            order.setStatus(5);
            order.setUpdateTime(new Date());
            CompletableFuture.runAsync(() -> orderMapper.updateById(order));

            // 回滚库存
            OrderDetail orderDetail = orderDetailMapper.selectByOrderId(orderId);
            Integer num = orderDetail.getNum();
            Long itemId = orderDetail.getItemId();
            CompletableFuture.runAsync(() -> itemClient.addStock(itemId, num));

            // 删除物流记录
            CompletableFuture.runAsync(() -> orderLogisticsMapper.deleteByOrderId(orderId));
            log.info("超时订单处理完毕");
        }

    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

注意

**注意:**RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

# 延迟交换机

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息DelayExchange插件来实现相同的效果。
官方文档说明:Scheduling Messages with RabbitMQ | RabbitMQ - Blog (opens new window)

# 插件安装

# 1、插件下载

插件下载地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (opens new window)
注意:插件版本必须与RabbitMQ主版本保持一致,否则运行时可能会报出:

connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
1
# 2、Docker 安装

我是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins
1

执行结果:

[
  {
    "CreatedAt": "2024-02-26T19:57:07-08:00",
    "Driver": "local",
    "Labels": null,
    "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
    "Name": "mq-plugins",
    "Options": null,
    "Scope": "local"
  }
]
1
2
3
4
5
6
7
8
9
10
11

可以看到Mountpoint挂载到了/var/lib/docker/volumes/mq-plugins/_data目录下。
所以我们将下载的文件拷贝到这个目录下
recording.gif
执行命令:

docker exec -it rmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1

执行结果
image.png

# 声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
1
2
3
4
5
6
7
8

基于@Bean的方式:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

交换机
image.png

# 发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

打印

2024-03-01T10:14:45.440+08:00  INFO 6620 --- [168.95.128:5672] cn.kk.MqPublisherTest                    : 发送消息成功
2024-03-01T10:14:55.480+08:00  INFO 11788 --- [ntContainer#0-1] cn.kk.mq.MqListener                     : 接收到delay.queue的延迟消息:hello
1
2

可以看到10:14:45 -> 10:14:55正好10s

注意

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。

# 取消超时订单问题

介于延迟消息的实现不易于存储大量且延迟时间过长的消息,像是取消30分钟超时未支付订单这类场景,直接创建30分钟的延迟消息往队列里丢显然不合理:

  1. 并发高的情况,容易造成消息堆积,对MQ压力很大
  2. 大多订单都能在30分钟之前完成支付,但却需要在队列中等待30分钟,浪费资源
# 解决方案

yuque_mind (3).jpeg
我们在用户下单后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、...30分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。这样就可以有效避免对MQ资源的浪费了。

我们可以封装一个特殊的消息模型:

@Data
public class SpinDelayMessage<T> implements Serializable{

    /**
     * 消息体
     */
    private T data;

    /**
     * 延迟时间数组(ms)
     */
    private List<Long> delayTimes;

    public SpinDelayMessage() {
    }

    public SpinDelayMessage(T data, List<Long> delayTimes) {
        this.data = data;
        this.delayTimes = delayTimes;
    }

    public SpinDelayMessage(T data, Long... delayTimes) {
        this.data = data;
        this.delayTimes = new ArrayList<>(Arrays.asList(delayTimes));
    }

    /**
     * 获取下一次延迟时间
     * @return 下一次延迟时间
     */
    public Long removeAndGetNextDelay() {
        return delayTimes.remove(0);
    }

    /**
     * 是否还有下一个延迟时间
     * @return 是否还有下一个延迟时间
     */
    public boolean hasNextDelayTime() {
        return !delayTimes.isEmpty();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 生产者优化
// 可以根据业务来定制时间间隔,像这样来控制MQ消息存储量,减轻MQ压力
static Long[] delayTimes = new Long[]{
    10000L, 10000L, 10000L, 10000L, 10000L, 10000L, // 10s 6次(1分钟)
    20000L, 20000L, 20000L, // 20s 3次(1分钟)
    60000L, 60000L, 60000L, // 60s 3次(3分钟)
    300000L, // 300s 1次(5分钟)
    600000L, 600000L}; // 600s 2次(10分钟)
@Test
public void test() {
    SpinDelayMessage<String> msg = new SpinDelayMessage<>("hello", delayTimes);
    rabbitTemplate.convertAndSend("delay.direct", "delay", msg,
                                  message -> {
                                      message.getMessageProperties().setDelay(msg.removeNextDelayTime().intValue());
                                      return message;
                                  });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

甚至可以另外创建一个DelayMessagePostProcessor对象来针对延迟消息做处理

public class DelayMessagePostProcessor implements MessagePostProcessor {
    
    private final int delay;

    public DelayMessagePostProcessor(int delay) {
        this.delay = delay;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }

    @Override
    public Message postProcessMessage(Message message, Correlation correlation) {
        return MessagePostProcessor.super.postProcessMessage(message, correlation);
    }

    @Override
    public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
        return MessagePostProcessor.super.postProcessMessage(message, correlation, exchange, routingKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

然后生产者改成这样

// 可以根据业务来定制时间间隔
static Long[] delayTimes = new Long[]{
    10000L, 10000L, 10000L, 10000L, 10000L, 10000L, // 10s 6次(1分钟)
    20000L, 20000L, 20000L, // 20s 3次(1分钟)
    60000L, 60000L, 60000L, // 60s 3次(3分钟)
    300000L, // 300s 1次(5分钟)
    600000L, 600000L}; // 600s 2次(10分钟)
@Test
public void test() {
    SpinDelayMessage<String> msg = new SpinDelayMessage<>("hello", delayTimes);
    rabbitTemplate.convertAndSend("delay.direct", "delay", msg,
                                  new DelayMessagePostProcessor(msg.removeNextDelayTime().intValue()));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者优化
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue", durable = "true"),
    exchange = @Exchange(name = "delay.direct", delayed = "true"),
    key = "delay"
))
public void listenDelayMessage(SpinDelayMessage<Long> msg){
    // TODO 业务判断,如订单是否支付
    if (msg.getData().equals(1L)) {
        logger.info("订单已完成支付...");
        return;
    }
    // 判断是否还有剩余时间
    if (msg.hasNextDelayTime()) {
        // 有剩余,重新加入队列
        rabbitTemplate.convertAndSend("delay.direct", "delay", msg,
                    new DelayMessagePostProcessor(msg.getNextDelayTime().intValue()));
        return;
    }
    // TODO 没有剩余时间,取消订单
    logger.info("订单已超时...");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 最佳实践

# 全局常量

随着业务的开发,队列和交换机可能会越来越多,直接写在业务代码中不易于管理,一般会抽取一个常量来统一管理和使用这些队列交换机字符串。

public interface MqConstants {
    String DELAY_EXCHANGE = "trade.delay.topic";
    String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
    String DELAY_ORDER_ROUTING_KEY = "order.query";
}
1
2
3
4
5

# 抽取mq工具

@AllArgsConstructor
public class RabbitMqHelper {

    private final RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, Object msg) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }

    public void sendMessage(String queue, Object msg) {
        rabbitTemplate.convertAndSend(queue, msg);
    }

    public void sendMessage(String exchange, String routingKey, Object msg, int delay) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            message.getMessageProperties().setDelay(delay);
            return message;
        });
    }

    public void sendMessage(String exchange, String routingKey, Object msg, BiConsumer<? super CorrelationData.Confirm, ? super Throwable> action) {
        CorrelationData cd = new CorrelationData();
        cd.getFuture().whenComplete(action);
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#rabbitmq#mq
上次更新: 2024/03/02 22:02:56
分布式事务
Kafka

← 分布式事务 Kafka→

最近更新
01
JVM 底层
09-13
02
JVM 理论
09-13
03
JVM 应用
09-13
更多文章>
Theme by Vdoing | Copyright © 2022-2024 kinoko | MIT License | 粤ICP备2024165634号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式