(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

news/2025/4/26 13:24:42/

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

文章目录

  • RabbitMQ-进阶 死信队列、延迟队列、防丢失机制
    • 死信队列
    • 延迟队列
      • 延迟队列介绍
      • **延迟队列_死信队列_的实现**
      • 延迟队列_插件实现
        • 下载插件
        • RabbitMQ 配置类
        • RabbitMQ 生产者
        • RabbitMQ 消费者
        • 测试
    • RabbitMQ防止消息丢失
      • 消息丢失场景
      • 生产者发送消息没有发送到rabbit交换机
      • 交换机没有发送到队列
      • 交换机、队列、消息没有设置持久化
      • 消费者接收到消息没有执行业务逻辑,导致消息丢失

死信队列

概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以 将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死 信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

在这里插入图片描述

消息成为死信的情况

  1. 队列消息长度到达限制
  2. 消费者拒签消息,并且不把消息重新放入原队列
  3. 消息到达存活时间未被消费

代码实现

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig2 {private final String DEAD_EXCHANGE = "dead_exchange";private final String DEAD_QUEUE = "dead_queue";private final String NORMAL_EXCHANGE = "normal_exchange";private final String NORMAL_QUEUE = "normal_queue";// 死信交换机@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列@Beanpublic Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dead_routing").noargs();}// 普通交换机@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();}// 普通队列@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字.ttl(10000) // 消息存活10s.maxLength(10) // 队列最大长度为10.build();}// 普通交换机绑定普通队列@Beanpublic Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

测试

1、生产者发送消息

@Test
public void testDlx(){// 存活时间过期后变成死信//       rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");// 超过队列长度后变成死信//       for (int i = 0; i < 20; i++) {//           rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");//       }// 消息拒签但不返回原队列后变成死信rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}

2、

@Component
public class DlxConsumer {@RabbitListener(queues = "normal_queue")public void listenMessage(Message message, Channel channel) throws IOException {// 拒签消息channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}
}

延迟队列

延迟队列介绍

什么是延时队列?

延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费

在这里插入图片描述

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果

在这里插入图片描述

延迟交换机主要帮我们解决什么问题

(1)当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;

(2)就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的

适用场景

(1)商城订单超时未支付,取消订单

(2)使用权限到期前十分钟提醒用户

(3)收益项目,投入后一段时间后产生收益

延迟队列_死信队列_的实现

1、创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、 lombok依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>

2、编写配置文件

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 123456virtual-host: /# 日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

3、创建队列和交换机

@Configuration
public class RabbitConfig {// 订单交换机和队列private final String ORDER_EXCHANGE = "order_exchange";private final String ORDER_QUEUE = "order_queue";// 过期订单交换机和队列private final String EXPIRE_EXCHANGE = "expire_exchange";private final String EXPIRE_QUEUE = "expire_queue";// 过期订单交换机@Bean(EXPIRE_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(EXPIRE_EXCHANGE).durable(true).build();}// 过期订单队列@Bean(EXPIRE_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(EXPIRE_QUEUE).build();}// 将过期订单队列绑定到交换机@Beanpublic Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with("expire_routing").noargs();}// 订单交换机@Bean(ORDER_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}// 订单队列@Bean(ORDER_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(ORDER_QUEUE).ttl(10000) // 存活时间为10s,模拟30min.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字.build();}// 将订单队列绑定到交换机@Beanpublic Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with("order_routing").noargs();}
}

4、编写下单的控制器方法,下单后向订单交换机发送消息

@Testpublic String placeOrder(String orderId){System.out.println("处理订单数据...");// 将订单id发送到订单队列rabbitTemplate.convertAndSend("order_exchange", "order_routing", orderId);return "下单成功,修改库存";}

5、编写监听死信队列的消费者

// 过期订单消费者
@Component
public class ExpireOrderConsumer {// 监听队列@RabbitListener(queues = "expire_queue")public void listenMessage(String orderId){System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");}
}

延迟队列_插件实现

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

在这里插入图片描述

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列
在这里插入图片描述

下载插件

RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 官网下载到它

https://www.rabbitmq.com/community-plugins.htmlhttps://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

在这里插入图片描述

选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez  rabbitmq1:/pluginsdocker exec  rabbitmq1  rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ 配置类

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
@Slf4j
public class RabbitConfig3 {/*** 交换机*/public static final String DELAY_EXCHANGE = "delay_exchange";/*** 队列*/public static final String DELAY_QUEUE = "delay_queue";/*** 路由*/public static final String DELAY_KEY = "delay_key";@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));return rabbitTemplate;}/*** 直接模式队列1*/@Beanpublic Queue directOneQueue() {return new Queue("cundream");}/*** 延时队列交换机** @return*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);}/*** 延时队列** @return*/@Beanpublic Queue delayQueue() {return new Queue(DELAY_QUEUE, true);}/*** 给延时队列绑定交换机** @return*/@Beanpublic Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();}
}

RabbitMQ 生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(Object object, long millisecond) {this.rabbitTemplate.convertAndSend("delay_exchange","delay_key",object.toString(),message -> {message.getMessageProperties().setHeader("x-delay", millisecond);return message;});}
}

RabbitMQ 消费者

import cn.hutool.json.JSONUtil;
import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @className: com.github.cundream.springbootbuilding.common.rabbitmq.consumer-> ReceiveDealyConsumer* @description:* @author: 李村 * @createDate:*/
@Slf4j
@RabbitListener(queuesToDeclare = @Queue(RabbitConst.DELAY_QUEUE))
@Component
public class ReceiveDealyHandler {@RabbitHandlerpublic void directHandlerManualAck(Object object, Message message, Channel channel) {//  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉final long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("直接队列1,手动ACK,接收消息:{}", object.toString());// 通知 MQ 消息已被成功消费,可以ACK了channel.basicAck(deliveryTag, false);} catch (IOException e) {try {// 处理失败,重新压入MQchannel.basicRecover();} catch (IOException e1) {e1.printStackTrace();}}}
}

测试

通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

    @RequestMapping(value = "/delayMessage",method = RequestMethod.GET)public void delayMessage() {String message1 = "这是第一条消息";String message2 = "这是第二条消息";rabbitMqService.sendDelayMessage(message1, 5000);rabbitMqService.sendDelayMessage(message2, 10000);}

RabbitMQ防止消息丢失

消息丢失场景

MQ消息丢失场景主要有三个:

  • 消息生产者,发送消息后,rabbitMq服务器没有收到;导致消息丢失
  • rabbitmq收到消息后,没有持久化保存,导致消息丢失
  • 消费者收到消息后,没来得及处理,消费者宕机,导致消息丢失

生产者发送消息没有发送到rabbit交换机

解决方案:消息异步确认机制(confirm机制)

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 123456virtual-host: /publisher-confirms: true # 消息异步确认机制(confirm机制)

开启confirm机制后,在生产者每次发送消息,都会调用回调代码;开发人员,需要写回调函数的逻辑,处理发送失败的消息

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);}/*** confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue* @param correlationData 发送的消息的信息(交换机,路由,消息体等)* @param ack true成功,false失败* @param cause 发生错误的信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 失败,一般解决方案,是将发送失败消息,存入定时任务队列;尝试重新发送消息;再多次失败,// 就不再发送,转为人工处理if (!ack) {log.error("rabbitmq confirm fail,cause:{}", cause);// ...... 失败处理逻辑}}
}

交换机没有发送到队列

解决方案:Return模式,确保消息从交换机发送到队列。

1、开启return模式

#开启 return 机制
spring:rabbitmq:publisher-returns: true

2、开发回调函数

@Component
public class Sender implements RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setReturnCallback(this);}//通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主体message: " + message);System.out.println("消息replyCode: " + replyCode);System.out.println("描述: " + replyText);System.out.println("消息使用的交换器exchange: " + exchange);System.out.println("消息使用的路由键routing: " + routingKey);}
}

交换机、队列、消息没有设置持久化

交换机、队列、消息没有持久化,当rabbitmq的服务重启之后,这些信息就会丢失。

交换机持久化
在声明交换机的时候,设置持久化属性

	/*** 构造参数说明:* 参数1:交换机名称* 参数2:durable:true表示持久化,false表示不持久化* 参数3:autoDelete:true自动删除,false不自动删除*/@Beanpublic TopicExchange exchange() {return new TopicExchange("exchangeName", true, false);}

队列持久化
在声明队列的时候,设置持久化属性

    public Queue queue() {/*** @param queueName 队列名称* @param durable 队列持久化,true持久化,false不持久化* @param exclusive 是否排他, true不排他,false排他;此处配置一般false* @param autoDelete 是否自动删除,无生产者,队列自动删除* @param args 队列参数*/return new Queue("queueName", true, false, false, args);}

消息持久化

消息的持久化是默认持久的。无需配置

消费者接收到消息没有执行业务逻辑,导致消息丢失

解决方案:手动确认消息机制
配置文件配置

**spring.rabbitmq.listener.simple.acknowledge-mode=manual**
spring:rabbitmq:host: 127.0.0.1#host: 10.106.10.91port: 5672username: adminpassword: 123456virtual-host: pubpublisher-confirms: true   # 开启发送确认publisher-returns: true  # 开启发送失败回退#开启acklistener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manual #采取手动应答#concurrency: 1 # 指定最小的消费者数量#max-concurrency: 1 #指定最大的消费者数量retry:enabled: true # 是否支持重试
@Component
public class Consumer {@RabbitHandlerpublic void consumeMsg(String msg, Channel channel, Message message) throws IOException {//拿到消息延迟消费try {// .... 消费消息业务逻辑/*** deliveryTag	消息的随机标签信息* multiple	是否批量;true表示一次性的将小于deliveryTag的值进行ack*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();/*** deliveryTag	消息的随机标签信息* multiple	是否批量;true表示一次性的将小于deliveryTag的值进行ack* requeue	被拒绝的消息是否重新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

当业务出现意料之外的一场;消息就会重新回到队列中;会分发到其他正常consumer中进行消费


http://www.ppmy.cn/news/978935.html

相关文章

php的设计模式有哪些

1&#xff0c;创建设计模式&#xff08;Creational Patterns&#xff09;(5种)&#xff1a; 用于创建对象时的设计模式。更具体一点&#xff0c;初始化对象流程的设计模式。当程序日益复杂时&#xff0c;需要更加灵活地创建对象&#xff0c;同时减少创建时的依赖。而创建设计模…

项目管理:甘特图制定项目计划,提高项目管理效率

项目实施周期长&#xff0c;工作范围广&#xff0c;不确定因素多&#xff0c;因此项目管理具有巨大的挑战性。 项目经理需要具备专业的知识能力和个人应变能力&#xff0c;以管理整个项目的实施过程&#xff0c;提高项目实施的成功率和管理效率。 现在&#xff0c;随着社会市…

了解Unity编辑器之组件篇Tilemap(五)

Tilemap&#xff1a;用于创建和编辑2D网格地图的工具。Tilemap的主要作用是简化2D游戏中地图的创建、编辑和渲染过程。以下是一些Tilemap的主要用途&#xff1a; 2D地图绘制&#xff1a;Tilemap提供了一个可视化的编辑器界面&#xff0c;可以快速绘制2D地图&#xff0c;例如迷…

命名约定~

1.变量的命名约定 JavaScript 变量名称是区分大小写的&#xff0c;大写和小写字母是不同的。比如&#xff1a; let DogName Scooby-Doo; let dogName Droopy; let DOGNAME Odie; console.log(DogName); // "Scooby-Doo" console.log(dogName); // "Dro…

阿里云函数计算签名认证(iOS实现细节备注)

1、使用第三方库 AFNetworking进行网络请求。 2、阿里云函数计算签名认证文档 3、文档中添加 CanonicalizedFCHeaders 可以不用添加&#xff0c;CanonicalizedResource如何没有设置Path&#xff0c;在末尾加入“/”就可以了。 4、主要还是 hmac-sha256 签名认证&#xff0c;在实…

ether_crc_le()函数解析

函数作用 ether_crc_le()函数是一个用于计算以太网帧的循环冗余校验&#xff08;CRC&#xff09;的函数。 下面是ether_crc_le()函数的源码解析&#xff1a; c #include <stdint.h>uint32_t ether_crc_le(uint8_t *data, uint32_t size) {uint32_t crc 0xffffffff;whi…

从小白到大神之路之学习运维第67天-------Tomcat应用服务 WEB服务

第三阶段基础 时 间&#xff1a;2023年7月25日 参加人&#xff1a;全班人员 内 容&#xff1a; Tomcat应用服务 WEB服务 目录 一、中间件产品介绍 二、Tomcat软件简介 三、Tomcat应用场景 四、安装配置Tomcat 五、配置目录及文件说明 &#xff08;一&#xff09;to…

(八九)如何与InfluxDB交互InfluxDB HTTP API

以下内容来自 尚硅谷&#xff0c;写这一系列的文章&#xff0c;主要是为了方便后续自己的查看&#xff0c;不用带着个PDF找来找去的&#xff0c;太麻烦&#xff01; 第 8 章 前言&#xff1a;如何与InfluxDB交互 1、InfluxDB启动后&#xff0c;会向外提供一套HTTP API。外部程…