SpringBoot 项目中使用 spring-boot-starter-amqp 依赖实现 RabbitMQ

news/2024/12/5 5:07:00/

文章目录

    • 前言
    • 1、application.yml
    • 2、RabbitMqConfig
    • 3、MqMessage
    • 4、MqMessageItem
    • 5、DirectMode
    • 6、StateConsumer:消费者
    • 7、InfoConsumer:消费者

前言

本文是工作之余的随手记,记录在工作期间使用 RabbitMQ 的笔记。

1、application.yml

  • 使用 use 属性,方便随时打开和关闭使用 MQ ,并且可以做到细化控制。
spring:rabbitmq:use: truehost: 10.100.10.100port: 5672username: wenpassword: 123456exchangeSubPush: 'exWen'queueSubPush: 'ha.queue.SubPush'routeSubPush: '1000'exchangeState: sync.ex.StatequeueState: ha.q.ServerqueueStateSync: ha.q.StateServerrouteState: stateexchangeOnlineMonitor: 'sync.ex.State'routeOnlineMonitor: 'state'queueOnlineMonitor: 'ha.q.Online'
  • pom.xml 文件中使用的是 SpringBoot 项目,使用 spring-boot-starter-amqp 依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wen</groupId><artifactId>springboot-mybatis</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.5.3</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.18</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency></dependencies>
</project>

2、RabbitMqConfig

  • 配置类,将可配置的参数使用 @Value 做好配置,与 application.yml 相互对应。
package com.wen.mq;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Slf4j
@Configuration
@Data
public class RabbitMqConfig {@Value("${spring.rabbitmq.use:true}")private boolean use;@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host:}")private String virtualHost;@Value("${spring.rabbitmq.exchangeState}")private String exchangeState;@Value("${spring.rabbitmq.queueState}")private String queueState;@Value("${spring.rabbitmq.routeState}")private String routeState;@Value(("${spring.rabbitmq.queueStateSync}"))private String queueStateSync;@Value("${spring.rabbitmq.exchangeOnlineInfo}")private String exchangeOnlineInfo;@Value("${spring.rabbitmq.routeOnlineInfo}")private String routeOnlineInfo;@Value("${spring.rabbitmq.queueOnlineInfo}")private String queueOnlineInfo;@PostConstructprivate void init() {}
}

3、MqMessage

  • MQ 消息实体类
package com.wen.mq;import lombok.Data;@Data
public class MqMessage<T> {private String msgType;private String msgOrigin;private long time;private T data;}

4、MqMessageItem

  • MQ 消息实体类
package com.wen.mq;import lombok.Data;@Data
public class MqMessageItem {private long userId;private String userName;private int userAge;private String userSex;private String userPhone;private String op;}

5、DirectMode

  • 配置中心:使用 SimpleMessageListenerContainer 进行配置。
  • 新加一个消费者队列就要在这里进行配置。
package com.wen.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DirectMode {@AutowiredRabbitMqConfig rabbitMqConfig;@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate StateConsumer stateConsumer;@Autowiredprivate InfoConsumer infoConsumer;@Beanpublic SimpleMessageListenerContainer initMQ() {if (!rabbitMqConfig.isUse()) {return null;}log.info("begin!");SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认// 设置一个队列container.setQueueNames(rabbitMqConfig.getQueueStateSync());//如果同时设置多个队列如下: 前提是队列都是必须已经创建存在的//container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3”);//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue("TestDirectQueue",true));//container.addQueues(new Queue("TestDirectQueue2",true));//container.addQueues(new Queue("TestDirectQueue3",true));container.setMessageListener(stateConsumer);log.info("end");return container;}@Beanpublic SimpleMessageListenerContainer contactSyncContainer() {if (!rabbitMqConfig.isUse()) {return null;}log.info("contact begin");SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息//设置一个队列container.setQueueNames(rabbitMqConfig.getQueueOnlineInfo());container.setMessageListener(infoConsumer);log.info("contact end");return container;}@Beanpublic Queue queueState() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueState());}@Beanpublic Queue queueStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueStateSync());}@BeanDirectExchange exchangeState() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeState());}@BeanBinding bindingState() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueState()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}@BeanBinding bindingStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueStateSync()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}// 新加一个消费者@Beanpublic Queue queueOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueOnlineInfo());}@BeanDirectExchange exchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeOnlineInfo());}@BeanBinding bindingExchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueOnlineMonitor()).to(exchangeOnlineMonitor()).with(rabbitMqConfig.getRouteOnlineInfo());}
}

6、StateConsumer:消费者

  • 实现 ChannelAwareMessageListener 接口,可以在这里面做相应的操作,例如存缓存,存库等。
package com.wen.mq;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;@Slf4j
@Component
public class StateConsumer implements ChannelAwareMessageListener {@AutowiredRabbitMqConfig rabbitMqConfig;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName = message.getMessageProperties().getConsumerQueue();long deliveryTag = message.getMessageProperties().getDeliveryTag();if (!rabbitMqConfig.getQueueStateSync().equals(queueName)) {String bodyStr = new String(message.getBody(), StandardCharsets.UTF_8);try {MqMessage<List<MqMessageItem>> mqMessage = JSON.parseObject(bodyStr, new TypeReference<MqMessage<List<MqMessageItem>>>() {});// 这里可以对消息做其他处理,例如存储到缓存中List<MqMessageItem> items = mqMessage.getData();if (CollectionUtil.isNotEmpty(items)) {applyToRedis(mqMessage);}log.info("consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}", queueName, deliveryTag, mqMessage);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error("parse mq msg exception, queue:{}, deliveryTag:{}", queueName, deliveryTag, e);channel.basicReject(deliveryTag, false);} catch (Exception e) {log.error("consume mq msg exception, queue:{}, deliveryTag:{}", queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}}}public static final String MQ_STATE_OP_REMOVE_STATE = "REMOVE_STATE";public static final String MQ_STATE_OP_CHANGE_STATE = "CHANGE_STATE";private void applyToRedis(MqMessage<List<MqMessageItem>> mqMessage) {List<MqMessageItem> data = mqMessage.getData();Map<String, List<MqMessageItem>> itemGroupByOp = data.stream().collect(Collectors.groupingBy(item -> item.getOp()));List<MqMessageItem> stateToRemove = itemGroupByOp.get(MQ_STATE_OP_REMOVE_STATE);List<MqMessageItem> stateToChange = itemGroupByOp.get(MQ_STATE_OP_CHANGE_STATE);if (CollectionUtil.isNotEmpty(stateToRemove)) {Map<Long, Set<String>> map = new HashMap<>();for (MqMessageItem item : stateToRemove) {map.computeIfAbsent(item.getUserId(), u -> new HashSet<>()).add(String.valueOf(item.getUserAge()));}// cacheService.removeUserState(map);}if (CollectionUtil.isNotEmpty(stateToChange)) {List<MqMessageItem> list = stateToChange.stream().map(u -> {MqMessageItem dto = new MqMessageItem();dto.setUserId(u.getUserId());dto.setUserAge(u.getUserAge());dto.setUserName(u.getUserName());dto.setUserSex(u.getUserSex());dto.setUserPhone(u.getUserPhone());return dto;}).collect(Collectors.toList());// cacheService.saveUserState(list);}}
}

7、InfoConsumer:消费者

  • 实现 ChannelAwareMessageListener 接口,可以在这里面做相应的操作,例如存缓存,存库等。
package com.wen.mq;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class InfoConsumer implements ChannelAwareMessageListener {@AutowiredRabbitMqConfig rabbitMqConfig;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName = message.getMessageProperties().getConsumerQueue();log.info("queueName: {}", queueName);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] body = message.getBody();String content = new String(body);MqMessage msg = JSONObject.parseObject(content, MqMessage.class);if (rabbitMqConfig.getQueueOnlineInfo().equals(queueName)) {// 订阅到的消息就是变更的消息// 这里可使用service对消息进行消费,返回一个booleanlog.info("用户监控数据写入失败!数据:{}", msg);}log.info("consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}", queueName, deliveryTag, msg);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error("parse mq msg exception, queue:{}, deliveryTag:{}", queueName, deliveryTag, e);channel.basicReject(deliveryTag, false); //为true会重新放回队列} catch (Exception e) {log.error("consume mq msg exception, queue:{}, deliveryTag:{}", queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}}
}

http://www.ppmy.cn/news/1551323.html

相关文章

容器第二天(day039)

1.jq处理json格式数据 使用场景&#xff1a;docker inspect出来的结果是json格式数据 安装&#xff1a;yum install -y jq 用法&#xff1a;docker inspect nginx:alpine |jq .[].GraphDriver.Data.WorkDir 遇到[]&#xff0c;则.[]。 2.容器管理 运行起来的镜像可以成为容…

【linux】重定向

linux重定向 从定义上来看&#xff0c;Linux重定向就是指修改原来默认的一些东西&#xff0c;对原来系统命令的默认执行方式进行改变&#xff0c;比如说简单的我不想看到在显示器的输出而是希望输出到某一文件中就可以通过Linux重定向来进行这项工作。在进一步探索重定向之前&…

Flutter 1.1:下载Flutter环境

1、在AS中下载Flutter插件 在setting的Plugins中下载Flutter&#xff0c;如图所示&#xff0c;可以直接进行搜索查找 2、下载flutter的sdk源代码 flutter中文文档学习 通过Git下载SDK源代码 git clone -b stable https://github.com/flutter/flutter.git3、配置系统变量 3…

遗传算法与深度学习实战(25)——使用Keras构建卷积神经网络

遗传算法与深度学习实战&#xff08;25&#xff09;——使用Keras构建卷积神经网络 0. 前言1. 卷积神经网络基本概念1.1 卷积1.2 步幅1.3 填充1.4 激活函数1.5 池化 2. 使用 Keras 构建卷积神经网络3. CNN 层的问题4. 模型泛化小结系列链接 0. 前言 卷积神经网络 (Convolution…

springboot343大学生选修选课系统的设计与实现(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 大学生选修选课系统设计与实现 摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c…

如何给GitHub的开源项目贡献PR

&#x1f3af;导读&#xff1a;本文详细介绍了如何向开源项目“代码随想录”贡献自己的题解。首先&#xff0c;需要Fork原项目的仓库至个人GitHub账户&#xff0c;然后解决克隆仓库时可能遇到的SSH密钥问题。接着&#xff0c;按照标准流程对本地仓库进行代码或文档的修改&#…

NAT拓展

NAT ALG&#xff08;NAT应用级网&#xff09; 为某些应用层协议&#xff0c;因为其报文内容可能携带IP相关信息&#xff0c;而普通NAT转化无法将这些IP转化&#xff0c;从而导致协议无法正常运行 例如FTP&#xff0c;DHCP&#xff0c;RSTP&#xff0c;ICMP&#xff0c;IPSEC…

AJAX 实时搜索

AJAX 实时搜索 AJAX&#xff08;Asynchronous JavaScript and XML&#xff09;实时搜索是一种无需刷新整个网页就能从服务器获取数据并在网页上展示的技术。这种技术极大地提升了用户体验&#xff0c;尤其是在搜索引擎、在线购物网站、社交媒体平台等应用中。本文将详细介绍AJ…