.Net Core `RabbitMQ`封装

news/2024/4/19 2:10:37

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:
    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup><PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" /><PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" /><PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" /><PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;public class EventsBusOptions
{/// <summary>/// 接收时异常事件/// </summary>public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;public interface IEventsBusHandle<in TEto> where TEto : class
{Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;public interface ILoadEventBus
{/// <summary>/// 发布事件/// </summary>/// <param name="eto"></param>/// <typeparam name="TEto"></typeparam>/// <returns></returns>Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{public readonly string Name;public EventsBusAttribute(string name){Name = name;}
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;namespace Microsoft.Extensions.DependencyInjection;public static class EventsBusRabbitMQExtensions
{public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,IConfiguration configuration){services.AddSingleton<RabbitMQFactory>();services.AddSingleton(typeof(RabbitMQEventsManage<>));services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();return services;}
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;namespace EventsBus.RabbitMQ.Options;public class RabbitMQOptions
{/// <summary>/// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>/// 指示应使用的协议的缺省值。/// </summary>public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;/// <summary>/// 地址/// </summary>public string HostName { get; set; }/// <summary>/// 账号/// </summary>public string UserName { get; set; }/// <summary>/// 密码/// </summary>public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;namespace EventsBus.RabbitMQ;public class RabbitMQEventsManage<TEto> where TEto : class
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;_ = Task.Run(Start);}private void Start(){var channel = _rabbitMqFactory.CreateRabbitMQ();var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;channel.QueueDeclare(name, false, false, false, null);var consumer = new EventingBasicConsumer(channel); //消费者channel.BasicConsume(name, true, consumer); //消费消息consumer.Received += async (model, ea) =>{var bytes = ea.Body.ToArray();try{// 这样就可以实现多个订阅var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();foreach (var handle in events){await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));}}catch (Exception e){EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);}};}
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;namespace EventsBus.RabbitMQ;public class RabbitMQFactory : IDisposable
{private readonly RabbitMQOptions _options;private readonly ConnectionFactory _factory;private IConnection? _connection;public RabbitMQFactory(IOptions<RabbitMQOptions> options){_options = options?.Value;// 将Options中的参数添加到ConnectionFactory_factory = new ConnectionFactory{HostName = _options.HostName,UserName = _options.UserName,Password = _options.Password,Port = _options.Port};}public IModel CreateRabbitMQ(){// 当第一次创建RabbitMQ的时候进行链接_connection ??= _factory.CreateConnection();return _connection.CreateModel();}public void Dispose(){_connection?.Dispose();}
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;namespace EventsBus.RabbitMQ;public class RabbitMQLoadEventBus : ILoadEventBus
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;}public async Task PushAsync<TEto>(TEto eto) where TEto : class{//创建一个通道//这里Rabbit的玩法就是一个通道channel下包含多个队列Queueusing var channel = _rabbitMqFactory.CreateRabbitMQ();// 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;// 使用获取的名称创建一个通道channel.QueueDeclare(name, false, false, false, null);var properties = channel.CreateBasicProperties();properties.DeliveryMode = 1;// 将数据序列号,然后发布channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息// 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();await Task.CompletedTask;}
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包


http://www.ppmy.cn/news/93646.html

相关文章

探索无限可能:物联网技术的未来应用引领智能化时代

⭐ 物联网技术⭐ 物联网技术的应用⭐ 物联网发展和创新挑战 当我们回顾过去几十年的科技发展&#xff0c;不难发现物联网技术的崛起和蓬勃发展。物联网的概念已经成为当今科技领域的热门话题&#xff0c;它正在以惊人的速度渗透到我们的日常生活中。从智能家居到智能城市&#…

1688 API分享:1688商品采集接口 1688关键字搜索接口

随着“无界零售”时代的到来&#xff0c;越来越多的企业开始寻求数字化转型&#xff0c;其中最重要的一个环节就是数据的互通和整合。而阿里巴巴旗下的B2B网站1688也推出了API接口&#xff0c;为企业间的数据交流提供了便利。电商API中有两个热门的接口&#xff0c;经常会被大家…

C++ string类常用函数

size()&#xff1a;返回当前对象含字符的个数&#xff0c;如果不含字符串&#xff0c;返回0。set(char s[])&#xff1a;将字符串s赋值给当前对象。substr(pos,len)&#xff1a;返回从pos开始&#xff0c;长度为len的字串&#xff0c;时间复杂度为O(len)。find(char c,pos0)&am…

华为CE12808/S9700交换机istack/CSS堆叠主备倒换操作命令步骤

一、华为CE12808交换机&#xff0c;istack堆叠状态 1、设备型号&#xff1a; 交换机一&#xff1a; HUAWEI CE12808 交换机二&#xff1a; HUAWEI CE12808 2、istack堆叠主备倒换操作步骤&#xff1a; 2.1、设备当前配置保存并进行备份。 2.2、切换所用命令。 执行命令display…

破解excel单元格保护

EI目录.xlsx 被保护&#xff0c;想查其中期刊&#xff0c;却不能直接复制。 step1.文件后缀改成.rarstep2.360压缩包打开&#xff0c;找到【sheet1.xml】step3.使用记事本打开&#xff0c;删除部分指定代码step4.后缀改回.xlsx 我是用360压缩包可以直接在.rar中 进行修改。 …

【计算思维题】少儿编程 蓝桥杯青少组计算思维真题及详细解析第3套

少儿编程 蓝桥杯青少组计算思维题及详细解析第3套 1、浩浩的左手边是 A、兰兰 B、贝贝 C、青青 D、浩浩 答案:B 考点分析:主要考查小朋友们的观察能力,从给定的图中可以看到:浩浩的左手边是贝贝,所以答案B 2、2 时 30 分,钟面上时针和分针形成的角是什么角 A、钝角…

如何用海外代理辅助对接 ChatGPT

许多朋友问我有没有好用的海外代理。说实话&#xff0c;真的好用的并不多。 最近我了解到了一家还不错的海外代理&#xff0c;叫做 IPIDEA&#xff0c;我已经使用了一段时间了&#xff0c;觉得质量挺不错。 你可能知道&#xff0c;我最近在进行一些 ChatGPT 相关的研究&#xf…

Anaconda使用总结(conda操作,环境操作,包管理)

Anaconda使用总结 配合Pycharm使用conda命令conda操作环境操作包管理Anaconda源下载包&#xff1a;whl文件本地安装Github源安装PIP和Condaconda换源 其他 背景&#xff1a;Anaconda作为深度学习最流行的pipeline之一&#xff0c;可以方便的修改和导出学习环境&#xff0c;每次…

【ROS】ROS2命令行工具详解

1、简介 ROS1中每个功能都使用ros开头的命令行工具&#xff0c;如&#xff1a;roscore、rosrun等 ROS2中只有一个命令行工具ros2&#xff0c;各个功能模块通过参数子命令来实现 ~$ ros2 -h 用法: ros2 [-h] [--use-python-default-buffering] <command> ……选项:-h, …

链式二叉树高质量OJ—【Leedcode】

目录 ​编辑 1. 单值二叉树 题目 题目分析 代码实现 不带返回值版本 带返回值版本 递归展开图 2. 相同的树 题目 题目分析 代码实现 3. 对称二叉树 题目 题目分析 代码实现 4. 另外一颗子树 题目 题目分析 代码实现 递归展开图 5. 二叉树的前、中、后序遍…

第63篇:美国NSA量子注入攻击的流量特征及检测方法

Part1 前言 大家好&#xff0c;我是ABC_123&#xff0c;公众号正式更名为”希潭实验室”&#xff0c;敬请关注。前不久花时间研究了美国NSA的量子注入攻击手法&#xff0c;并在Hackingclub山东济南站技术沙龙做了分享。对于这种攻击手法部分网友嗤之以鼻&#xff0c;认为是老美…

c# cad二次开发 通过选择txt文件将自动转换成多段线

c# cad二次开发 通过选择txt文件将自动转换成多段线&#xff0c;txt样式如下 using System; using System.Collections.Generic; using System.Text; using Autodesk.AutoCAD.ApplicationServices; using Autodesk.AutoCAD.EditorInput; using Autodesk.AutoCAD.Runtime; usi…

Sqoop基本操作

目录 一、mysql->hdfs 二、hdfs->mysql 三、mysql->hive 四、hive->mysql 五、sqoop job操作 一、mysql->hdfs (1)全表导入 sqoop import --connect jdbc:mysql://127.0.0.1:3306/test \ --username root --P \ --table student1 --target-dir /sqoop/stu…

STM32入门100步(第1步~第3步)

第一章 基础知识与平台建立 第1~2步 是时候学ARM了 1.1 为啥学? 什么是ARM? ARM处理器是英国Acorn有限公司设计的低功耗成本的第一款RISC(精简指令集)微处理器。全称为Advanced RISC Machine。ARM是一种性能出众的32位处理器的内核架构。1991年,一家叫ARM的公司在英国…

总结最全面的TCP、UDP、Socket、HTTP网络编程面试题

先看一天面试的经验&#xff1a; 第一场&#xff1a; 面试官&#xff1a;你说一下TCP的三次握手 我&#xff1a;第一次Client将SYN置1......、第二次Server收........、 第三次........ 面试官&#xff1a;很难背吧&#xff1f; 我&#xff1a;......是啊&#xff0c;很难&…

“程序员,致敬!”

手机震动&#xff0c;提醒着我3年前参加研发的应用迎来了一次重大升级。我按下开源社区提供的合并请求按钮&#xff0c;与开源社区的朋友分享我对这个项目的改进。不久&#xff0c;一条消息提醒我合并请求已被其它社区成员审核通过。 这种远程协作、开源分享的方式是如今广泛存…

Systrace系列10 —— Binder 和锁竞争解读

本文主要是对 Systrace 中的 Binder 和锁信息进行简单介绍,简单介绍了 Binder 的情况,介绍了 Systrace 中 Binder 通信的表现形式,以及 Binder 信息查看,SystemServer 锁竞争分析等。 Binder 概述 Android 的大部分进程间通信都使用 Binder,这里对 Binder 不做过多的解释…

如何用 ChatGPT 做数据进阶可视化?(三维交互图与动图视频)

你只需输入数据和需求&#xff0c;结果自然来。 自动可视化 在《如何用 ChatGPT 帮你自动分析数据&#xff1f;》这篇文章里&#xff0c;我已经为你介绍过 Code Interpreter 。它是 ChatGPT 的一个模式&#xff0c;目前还在 alpha 测试阶段。 Code Interpreter 可以接收文件输入…

【算法】Adding Two Negabinary Numbers 负二进制数相加

文章目录 Adding Two Negabinary Numbers 负二进制数相加问题描述&#xff1a;分析代码 Adding Two Negabinary Numbers 负二进制数相加 问题描述&#xff1a; 给出基数为 -2 的两个数 arr1 和 arr2&#xff0c;返回两数相加的结果。 数字以 数组形式 给出&#xff1a;数组由…

Hyper-V中安装Ubuntu Server 20.04虚拟机

文章目录 Ubuntu系统下载在Hyper-V中安装Ubuntu新建虚拟机配置虚拟机登录ubuntu远程连接Ubuntu安装net-tools使用XShell连接UbuntuHyper-V开启KVM虚拟化参考Ubuntu系统下载 https://cn.ubuntu.com/download/server/step1#downloads 下载后,即可得到ubuntu-20.04.6-live-serve…