(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

news/2024/2/28 1:52:13

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

文章目录

  • RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列
    • 消费端限流
    • 利用限流实现不公平分发
    • 消息存活时间
    • 优先级队列

消费端限流

在这里插入图片描述

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

2、消费端配置限流机制

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{@RabbitListener(queues = "my_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {// 1.获取消息System.out.println(new String(message.getBody()));// 2.模拟业务处理Thread.sleep(3000);// 3.签收消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

端配置不公平分发

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {// 消费者1@RabbitListener(queues = "my_queue")public void listenMessage1(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(500); // 消费者1处理快//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}// 消费者2@RabbitListener(queues = "my_queue")public void listenMessage2(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(3000);// 消费者2处理慢//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {private final String EXCHANGE_NAME="my_topic_exchange2";private final String QUEUE_NAME="my_queue2";// 1.创建交换机@Bean("bootExchange2")public Exchange getExchange2(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean("bootQueue2")public Queue getMessageQueue2(){return QueueBuilder.durable(QUEUE_NAME).ttl(10000) //队列的每条消息存活10s.build();}// 3.将队列绑定到交换机@Beanpublic Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);}
}

设置单条消息存活时间

@Test
public void testSendMessage() {//设置消息属性MessageProperties messageProperties = new MessageProperties();//设置存活时间messageProperties.setExpiration("10000");// 创建消息对象Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {private final String EXCHANGE_NAME="priority_exchange";private final String QUEUE_NAME="priority_queue";// 1.创建交换机@Bean(EXCHANGE_NAME)public Exchange priorityExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean(QUEUE_NAME)public Queue priorityQueue(){return QueueBuilder.durable(QUEUE_NAME)//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源.maxPriority(10).build();}// 3.将队列绑定到交换机@Beanpublic Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、编写生产者

@Test
public void testPriority() {for (int i = 0; i < 10; i++) {if (i == 5) {// i为5时消息的优先级较高MessageProperties messageProperties = new MessageProperties();messageProperties.setPriority(9);Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);} else {rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);}}
}

3、编写消费者

@Component
public class PriorityConsumer {@RabbitListener(queues = "priority_queue")public void listenMessage(Message message, Channel channel) throws Exception{//获取消息System.out.println(new String(message.getBody()));//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

http://www.ppmy.cn/news/980907.html

相关文章

C++笔记之对指针类型的变量进行+1操作

C笔记之对指针类型的变量进行1操作 在C中&#xff0c;对指针类型的变量进行"1"操作会根据指针的数据类型而有所不同。这涉及到指针的算术运算&#xff0c;C中的指针算术运算是根据指针所指向的数据类型的大小来进行的。 code review! 文章目录 C笔记之对指针类型的…

编码类型 ASCII URLcode编码 Unicode编码 utf编码理解

编码类型 ASCII URLcode编码 Unicode编码 utf编码理解 bin是二进制 oct是八进制 hex是16进制 Ord()检测ASCII码&#xff0c;python3也可查中文 HTML实体编码能被html页面解析&#xff0c;使用ord&#xff08;&#xff09;对单个字符查看转换后结果&#xff0c;字母就是ASCII…

SpringBoot开发小技巧使用(DEBUG、启动图标修改、Lombok、devtools、Spring Initializr)

目录 1. 通过DEBUG查看自动配置的组件2. springboot启动图标修改3. Lombok4. devtools5. 通过IDEA的Spring Initializr快速创建新项目 1. 通过DEBUG查看自动配置的组件 在resources/application.properties中添加如下&#xff0c;开启DEBUG功能&#xff1a; debugtrue然后启动…

直播带货app开发开发流程分析

随着小视频管理体系愈来愈变成人们的生活中的一部分&#xff0c;也随之短视频卖货逐步形成岗位内主流的转现方式&#xff0c;将短视频平台生产制造变成短视频带货体系计划愈来愈多&#xff0c;那样&#xff0c;把小视频管理体系开发设计变成短视频带货体系必须两步&#xff1f;…

[SQL挖掘机] - 视图相关操作

创建视图: create view view_name as select column1, column2, ... from table_name where condition;以上语句创建了一个名为view_name的视图&#xff0c;它基于table_name表格&#xff0c;并选择了列column1、column2等作为结果集。可以使用where子句来指定条件。 注意: 视…

【LeetCode】78.子集

题目 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[],[1],[2],[1…

Java 两台服务器间使用FTP进行文件传输

背景&#xff1a;需要把服务器A中的文件拷贝至服务器B中&#xff0c;要求使用FTP进行传输&#xff0c;当文件传输未完成时文件是tmp格式的&#xff0c;传输完毕后显示为原格式&#xff08;此处是grib2&#xff09;。 package org.example;import org.apache.commons.io.FileUt…

嵌入式:QT Day2

一、继续完善登录框&#xff0c;当登陆成功时&#xff0c;关闭登陆页面&#xff0c;跳转到新的界面中 源码&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QDebug> //用于打印输出 #include <QIcon> …

iOS transform rotate总结

研究了一下transform的旋转设置&#xff0c;调了半天还以为是旋转写错了&#xff0c;发现是两个不同的view对象写错了&#xff0c;不管怎么说&#xff0c;还是记录一下旋转相关的操作吧。 参数都是弧度。 以一个图片来举例。 let img UIImageView.init() img.image UIImage…

Cesium态势标绘专题-集结地(标绘+编辑)

标绘专题介绍:态势标绘专题介绍_总要学点什么的博客-CSDN博客 入口文件:Cesium态势标绘专题-入口_总要学点什么的博客-CSDN博客 辅助文件:Cesium态势标绘专题-辅助文件_总要学点什么的博客-CSDN博客 本专题没有废话,只有代码,代码中涉及到的引入文件方法,从上面三个链…

Java的集合类:List、Set、Map

在 Java 中&#xff0c;集合类是一组有序或无序的数据元素的集合。Java 集合类可用于存储和操作各种数据类型的元素&#xff0c;如整数、字符串、对象等。集合类是动态的&#xff0c;可以在运行时根据需要调整其大小。 Java 集合类可以分为三类&#xff1a; List - 有序集合&…

如何判断某个视频是深度伪造的?

目录 一、前言 二、仔细检查面部动作 三、声音可以提供线索 四、观察视频中人物的身体姿势 五、小心无意义的词语 深造伪造危险吗&#xff1f; 一、前言 制作深度伪造视频就像在Word文档中编辑文本一样简单。换句话说&#xff0c;您可以拍下任何人的视频&#xff0c;让他…

Docker运行MySQL5.7

步骤如下&#xff1a; 1.获取镜像&#xff1a; docker pull mysql:5.7 2.创建挂载目录&#xff1a; mkdir /home/mydata/data mkdir /home/mydata/log mkdir /home/mydata/conf 3.先启动docker把配置文件拷贝出来&#xff1a; docker run -it --name temp mysql:5.7 /bi…

低代码和零代码有哪些区别?

低代码开发的概念 低代码开发是一种新兴的软件开发方法&#xff0c;其核心是通过使用图形用户界面和可视化建模工具&#xff0c;来减少编写代码的工作量和技能要求。低代码开发平台通常提供了丰富的预定义组件和模板&#xff0c;可以帮助开发人员快速构建应用程序。开发人员只…

性能测试之并发用户数的估计

在计算并发用户数之前&#xff0c;需要先了解2个概念。 并发用户&#xff1a;指的是现实系统中同时操作业务的用户&#xff0c;在性能测试工具中一般称为虚拟用户。并发用户这些用户的最大特征是和服务器产生了交互&#xff0c;这种交互既可以是单向的传输数据&#xff0c;也可…

cartographer学习之调参小结

cartographer使用过程中也涉及到了很多的参数调整&#xff0c;不同的参数对算法的影响是不同的。这里简单记录一下一些常调的参数以及一些比较重要的参数。 1、常调的参数 TRAJECTORY_BUILDER_2D.min_range 0.3 TRAJECTORY_BUILDER_2D.max_range 100 TRAJECTORY_BUILDER_2D…

[leetcode]2365. Task Scheduler II

链接&#xff1a;力扣 给你一个下标从 0 开始的正整数数组 tasks &#xff0c;表示需要 按顺序 完成的任务&#xff0c;其中 tasks[i] 表示第 i 件任务的 类型 。 同时给你一个正整数 space &#xff0c;表示一个任务完成 后 &#xff0c;另一个 相同 类型任务完成前需要间隔…

Java封装继承多态

封装 1.将属性进行私有化private【不能直接修改属性】 2.提供一个公共的set方法&#xff0c;用于对属性判断并复制 public void setXxx(){//权限判断return xx; } 3.提供一个公共的get()方法&#xff0c;用于获取属性的值 public XX getXxx(){//权限判断return xx; } 比如…

go学习 3、基础数据类型

3、基础数据类型 基础数据类型&#xff1a;数字、字符串、布尔型复合类型&#xff1a;数组、结构体引用类型&#xff1a;指针、切片、字典、函数、通道接口类型 3.1 整型 有符号、无符号 int8/int16/int32/int64 uint8/uint16/uint32/units 64 Unicode字符rune类型是和int32…

k8s使用helm部署Harbor镜像仓库并启用SSL

1、部署nfs存储工具 参照&#xff1a;https://zhaoll.blog.csdn.net/article/details/128155767 2、部署helm 有多种安装方式&#xff0c;根据自己的k8s版本选择合适的helm版本 参考&#xff1a;https://blog.csdn.net/qq_30614345/article/details/131669319 3、部署Harbo…
最新文章