(七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

news/2024/4/15 7:40:18

发布者确认(Publisher Confirms)

发布者确认是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。

0、引言

先决条件

本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口(5672)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。

获取帮助

如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的利弊。

原文链接:https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html

1、在通道上启用发布者确认

发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,所以默认情况下它们是不启用的。使用 ConfirmSelect 方法可以在通道层级启用发布者确认:

var channel = connection.CreateModel();
channel.ConfirmSelect();

您必须在期望启用发布者确认的每个通道上调用该方法。确认只需要启用一次,而不是对每条发布的消息都启用。

策略 #1:单独发布消息

让我们从实现带确认的发布的最简单途径开始吧,那就是,发布一条消息并同步等待它确认

while (ThereAreMessagesToPublish())
{byte[] body = ...;IBasicProperties properties = ...;channel.BasicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

在前面的示例中我们像往常一样发布消息并使用 Channel#WaitForConfirmsOrDie(TimeSpan) 方法等待它确认。该方法在消息确认后立即返回。如果在超时时间内消息未得到确认或者如果消息已 nack(Negative-Acknowledgement) 了(意味着代理由于某些原因无法处理它),方法会抛出一个异常。异常的处理通常包括记录一个错误消息日志 并/或 重新尝试发送消息。

不同的客户端库有不同的方式去同步处理发布者确认,所以确保仔细阅读您正在使用的客户端的文档。

这个技术非常简单但也有一个巨大的缺点:它会显著降低发布速度,因为某条消息的确认会堵塞后续消息的发布。这种方法提供的吞吐量不会超过每秒几百条已发布的消息。不过,这对于某些应用程序来说已经足够好了。

发布者确认是异步的吗

在开头我们提到代理是异步确认已发布的消息的,但在第一个例子中,代码是同步等待直至消息确认的。客户端实际上异步接收确认,并相应地解除对 WaitForConfirmsOrDie 的调用阻塞。可以将 WaitForConfirmsOrDie 看作是一个同步 helper,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进上面的例子,我们可以发布一批消息并等待这一批消息全部得到确认。如下是一个使用 100 一批次的示例:

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{byte[] body = ...;IBasicProperties properties = ...;channel.BasicPublish(exchange, queue, properties, body);outstandingMessageCount++;if (outstandingMessageCount == batchSize){channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));outstandingMessageCount = 0;}
}
if (outstandingMessageCount > 0)
{channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

等待一批消息的确认比等待单个消息的确认大大提高了吞吐量(在远程 RabbitMQ 节点上最多可提高 20-30 倍)。一个缺点是,如果出现故障,我们不知道究竟是哪里出了问题,因此我们可能不得不在内存中保存整个批处理,以记录一些有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻止消息的发布。

策略 #3:异步处理发布者确认

代理异步确认已发布的消息,只需要在客户端上注册一个回调就可以收到这些确认的通知:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{// code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{//code when message is nack-ed
};

这儿有两个回调:一个用于已确认的消息,一个用于已 nack 的消息(可以认为是代理丢失的消息)。两个回调都有相应的 EventArgs 参数(ea)包含:

delivery tag
标识已确认或已 nack 消息的序列号。我们将很快看到如何将其与发布的消息关联起来。
multiple
这是一个布尔值。如果为 false,则仅有一条消息确认/nack-ed;如果为 true,所有序列号 小于等于该序列号的消息都确认/nack-ed。

在发布前,可以通过 Channel#NextPublishSeqNo 获得消息的序列号:

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);

将消息与序列号关联起来的一种简单方法是使用字典。让我们假设我们想要发送字符串,因为它们很容易转换为用于发布的字节数组。下面是一个代码示例,它使用字典将发布序列号与字符串消息体关联起来:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

发布代码现在使用字典跟踪出站消息。我们需要在确认到达时清理字典,并在消息已 nack 时做一些类似于记录警告的事情:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{if (multiple){var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);foreach (var entry in confirmed){outstandingConfirms.TryRemove(entry.Key, out _);}}else{outstandingConfirms.TryRemove(sequenceNumber, out _);}
}channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};// ... publishing code

先前的示例中包含一个在确认到达时清理字典的回调。注意,这个回调处理单次和多次确认。这个回调会在确认到达(Channel#BasicAcks)时被使用。用于已 nack 消息的回调将检索消息体并发出警告。然后,它重用之前的回调来清除字典中未完成的确认(无论消息是已确认还是已 nack,都必须删除字典中对应的条目)。

如何跟踪未完成的确认?

我们的示例使用一个 ConcurrentDictionary 跟踪未完成的确认。由于几种原因,这个数据结构十分方便。它允许我们能够轻易地将序列号与消息关联起来(无论消息数据是什么)并允许我们能够通过一个给出的序列 id 轻易地清理条目(以处理多次确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中被调用的,该线程应该与发布线程保持不同。

除了使用复杂的字典实现外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发哈希表和变量来跟踪发布序列的下界,但它们通常更复杂,不属于“教程”的范畴。

总而言之,异步处理发布者确认通常需要以下步骤:

  • 提供一个方法去关联发布序列号和消息。
  • 在通道上注册确认侦听器,以便在发布者 acks/nacks 到达时得到通知,并执行适当的操作,例如记录或者重新发布已 nack 的消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布已 nack 的消息?

在相应的回调中重新发布已 nack 的消息可能很诱人,但应该避免这样,因为确认回调是在(通道不应该执行操作的)I/O 线程中分配的。更好的方案是在内存队列中对消息进行排队,该队列由发布线程轮询。像 ConcurrentQueue 这样的类可以很好地在确认回调和发布线程之间传递消息。

总结

在某些应用程序中,确保已发布的消息到达代理是必要的。发布者确认(Publisher Confirms)是 RabbitMQ 的一个特性,可以帮忙满足这个需求。发布者确认本质上是异步的,但也可以同步处理它们。没有说只有一个绝对的方法来实现发布者确认,这通常取决于应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但在一些东西出现问题时很难推断。
  • 异步处理:最佳性能和资源使用,错误情况下的良好控制,但还需要参与正确实现的过程(无法直接用现成的)。

2、将所有的东西放到一起

PublisherConfirms.cs 类包含了我们所介绍的技术的代码。我们可以编译它,按原样执行它并看看每项技术的表现如何:

dotnet run

输出会看起来像下面这样:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

运行效果:
在这里插入图片描述

如果客户端和服务器位于同一台机器上,那么您的计算机的输出应当与之类似。不出所料单独发布消息表现十分糟糕;但出乎意料的是:与批量发布相比,异步处理的表现有些令人失望。

发布者确认十分依赖于网络,所以我们最好不要在远端节点上尝试,而在生产中,客户端和服务器通常不在同一台机器上却又是更现实的情况。PublisherConfirms.cs 可以很容易地更改为使用非本地节点:

private static IConnection CreateConnection()
{var factory = new ConnectionFactory { HostName = "remote-host", UserName = "remote-host", Password = "remote-password" };return factory.CreateConnection();
}

重新编译类,再次执行并等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布现在的表现仍然非常糟糕。但是有了客户端和服务器之间的网络,批量发布和异步处理现在表现得差不多,同时异步处理在发布者确认方面还有一点小优势。

请记住,批量发布很容易实现,但是在 negative publisher acknowledgement 的情况下,不容易知道哪些消息不能发送到代理。异步处理发布者确认需要更多的参与实现,但提供了更好的粒度和对发布消息已 nack 时执行的操作的更好控制。

5、生产[非]适用性免责声明

请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。

在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。


http://www.ppmy.cn/news/945807.html

相关文章

长度最小的子数组

题目链接https://leetcode.cn/problems/minimum-size-subarray-sum/ 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其和 ≥ target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的…

幅度、频率和相位可调的双通道DDS 信号发生器(文末设计了双通道DA硬件模块)

FPGA项目&#xff1a;之DDS信号发生器&#xff08;概括介绍了FPGA设计DDS信号发生器&#xff09; &#xff08;项目中部分参考了网上FPGA程序设计&#xff09; &#xff08;适合课程设计参考&#xff0c;程序板级实测好用&#xff0c;需要程序可在底下留言&#xff09; B站对应…

RK3568—基于GM8775C的MIPI转双通道LVDS屏幕调试

Rockchip RK3568 原生显示接口不支持双通道LVDS屏幕的数据输出&#xff0c;因此需要借助显示转换芯片才能实现双通道LVDS屏幕的驱动。本文介绍使用GM8775C芯片方案&#xff0c;在 RK3568 平台实现单路 MIPI DSI 信号输出来驱动双通道LVDS显示屏。 关于GM8775C GM8775C 型 DSI…

LabView---双通道示波器(内含信号发生器)

😃😃😃关注一下阿酱趴! 前言: 🙆‍♂️ 作者简介:一碗黄豆酱 🙆‍♂️ 给大家带来快乐的就是阿酱我! 🎈 CSDN 勤写标兵!、上兰村编程小能手! 👨🏻 座右铭:成功不是将来才有的,而是从决定去做的那一刻起,持续累积而成。 🧙 由于知识储备有限,如果…

使用Python实现音频双通道分离

某些音频是双方对话&#xff0c;有可能需要对音频作通道的分离。 示例代码如下&#xff1a; """ 音频双通道分离 """ import sys import numpy as np from scipy.io import wavfile from converter import mp3_to_wavdef split_channel(wav_pa…

【MATLAB图像融合】[18]双通道PCNN模型实现图像融合

引言 简单回顾一下以往的单通道PCNN模型&#xff0c;原理与实现步骤&#xff1a; 13、单通道PCNN原理 14、单通道PCNN融合代码实现 一、单通道PCNN 图1 单通道PCNN&#xff1a; 在单通道PCNN中&#xff0c;对于一个神经元的一次迭代过程正如图1描述&#xff1a; ①、F&#…

双通道FPGA数据采集卡

采集卡指标&#xff1a;FPGA&#xff08;altera&#xff09;、AD&#xff08;输入范围正负5V、AD9226、12bit、65MHz&#xff09;、SDRAM&#xff08;16bit数据位、13bit地址线&#xff09;、串口&#xff08;CH340&#xff09; 1. 电路&#xff1a; &#xff08;1&#xff09…

关于IMX双通道LVDS 的深入讲解

其实之前写过LCD/LVDS的一些时序的基本概念《与LCD移植相关的概念》。但后来发现还是不够全面。关于双通道LVDS&#xff0c;可能会有很多人有一些陌生&#xff0c;它是什么原理&#xff1f; 有什么作用&#xff1f; 时序如何设定&#xff1f; 接下来&#xff0c; 就让我们带着这…

Python双通道音频旋转

Python双通道音频旋转 之前没有接触过音频数据扩充&#xff0c;于是在网上找了下&#xff0c;发现很多内容都是一样&#xff0c;然后上面的代码运行后发现我扩充的音频数据全都变成了单通道&#xff0c;而我的原始数据是双通道的。于是我找了下原因&#xff0c;发现是音频数据…

基于Labview双通道频谱滤波器的设计

一、主要功能 本设计主要功能是仿真信号VI产生带噪声的信号&#xff0c;经过带通滤波器滤波&#xff0c;对滤波前后的信号进行双通道频谱测量&#xff0c;通过信号掩区和边界测量VI检测滤波后的信号是否能够在用户设定的信号频率范围内。 设计过程可以采用前面板与程序框图交…

DAC双通道输出电压实验

更多交流欢迎关注作者抖音号&#xff1a;81849645041 目的 了解DAC数模转换工作原理&#xff0c;利用DAC两个通道输出电压&#xff0c;结合ADC读取引脚电压。 原理 DAC 为数字/模拟转换模块&#xff0c;它的作用就是把输入的数字编码&#xff0c;转换成对应的模拟电压输出&am…

Kotlin获取Fragment中的组件

左边和右边分别是两个不同的Fragment&#xff0c;左边的Fragment中右一个Button组件&#xff0c;目标是想要获取这个组件的id&#xff0c;以便进行将右边的Fragment更改成另一个Fragmeent的操作。 left_fragment.xml <?xml version"1.0" encoding"utf-8&qu…

keras实现双通道模型

核心代码 单输入单输出 from keras.applications.vgg19 import VGG19 model_vgg = VGG19(include_top=False,weights=None, input_shape=(96, 96, 3)) from keras.applications.vgg16 import VGG16 model_vgg = VGG16(include_top=False,weights=None, input_shape=(96, 96,…

浅谈一下企业IT运维痛点以及好用的运维软件推荐

随着IT建设的不断深入和完善&#xff0c;IT资产越来越多&#xff0c;IT运维管理越发显得重要。但不少企业不知道如何有效进行IT运维&#xff0c;不知道如何更好进行IT运维&#xff0c;今天我们就来一起浅谈一下企业IT运维痛点&#xff0c;以及给大家推荐一款好用的运维管理软件…

2009-2021网络规划设计师论文题汇总

1、历年论文题目速览 分类 论文题目 网络规划与设计 2009-11&#xff1a;论电子政务专用网络的规划与设计 2010-05&#xff1a;论网络规划与设计中的可扩展性问题 2010-05&#xff1a;论大中型网络的逻辑网络设计 2010-11&#xff1a;论校园网/企业网的网络规划与设计 2010-…

网络规划设计师上午真题及解析(2019)

答案:A、B 解析:由于页面大小为4KB,前面的4位2进制为页面号,16进制5148H转2进制,由于2^4=16所以每个16进制位=4位2进制,5转为二进制位0101,再转为10进制位0*2^3+1*2^2+0*2^1+0*2^0=5,页号5转换表转换后为3,其他位不变,所以3148H 答案:C 解析:索引的作用相当于图…

PCL 二维凸包算法(Quickhull算法)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 QuickHull算法是一种典型的分而治之的凸包算法,其计算过程如下所述: 设 S S S为点的集合: 首先我们需要找到 x x

网络工程师--网络规划和设计案例分析(1)

一、案例&#xff1a;某企业网络拓扑结构如下图所示&#xff0c;租用ADSL宽带实现办公上网&#xff0c;配备一台小型路由器&#xff0c;实现ADSL自动拨号和DHCP服务功能&#xff0c;所有内部主机&#xff08;包括台式机和笔记本&#xff09;通过路由器实现Internet资源的访问&a…

网络规划设计师考试总结

没有总结,就没有提高。 下半年选择了 网络规划设计师 来冲刺高级资格,将复习与考试中的经验总结、分享一下。 高级资格目前开设了五门,其中 信息系统项目管理师 是报考人数最多的,网络规划设计师 报考人数较少,一般报考的都是从事网络工程、网络运维、运营商等工作的人。总…

网络工程师--网络规划和设计案例分析(2)

案例描述&#xff1a;某企业网络拓扑如下图所示&#xff1a; 工程师给出了该网络的需求&#xff1a; 1.用防火墙实现内外网地址转换和访问控制策略 2.核心交换机承担数据转发&#xff0c;并且与汇聚层两台交换机实现OSPF功能 3.接入层到汇聚层采用双链路方式组网 4.接入层交…
最新文章