【消息队列】聊一下Kafka多线程消费实例

news/2024/4/23 18:59:09/

Kafka Java Consumer设计原理

目前市面上大多数计算机都采用多核CPU来提升系统的处理性能,但是如果在程序开发层面使用单线程的话,那么必定不能完全发挥出系统的真实性能,而kafka Consumer就是单线程的。而这个只是针对于消费消息这个层面来说。
内部包含的是用户主线程和心跳线程,用户主线程说白了就是消费consumer的main线程,而心跳线程是在Consumer API中,会自动和Broker进行心跳检测的线程,两个线程指责不同,进行拆分出来也是非常合理的。可以解耦真实的消息处理和心跳检测管理机制
为什么Kafka不涉及成支持并发Consumer
从语言层面来说,并不是每个语言都可以很好的支持并发机制。并且从Kakfa推广层面来说,想要更好的打造上下游生态,那么必须要具备较好的移植性。

多线程方案

KakfaConsumer不是线程安全的。
1.消费者启动多个线程,每个线程维护专属的KakfaConsumer实例,进行消息的获取、消费流程
在这里插入图片描述
2.消费者使用单或者多线程获取消息,然后启动多个线程消费消息。获取消息可以是单个线程,或者是多个线程,但是每个线程都有专属的KafkaConsumer,将消息的获取和消息处理进行解耦合。
在这里插入图片描述
好了,我们来对比一个上述的两个方案。
如果说我们获取消息和处理消息分为1,2,3,4个过程。方案1会为创建多个线程,每个线程完整的执行完1,2,3,4过程,整个任务并不会被进行拆分,而方案2,用单线程处理1,2,然后多个线程处理3,4两个过程。
在这里插入图片描述

Code

方案1

   public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));//  执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}

方案2

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true)  {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..

http://www.ppmy.cn/news/47914.html

相关文章

焦虑症会出现哪些问题 什么因素导致的焦虑症

当说起焦虑症&#xff0c;大多数人想到的就是植物神经紊乱&#xff0c;确实&#xff0c;这两种疾病是非常容易混淆的&#xff0c;甚至很多时候植物神经紊乱都会当做焦虑症进行治疗&#xff0c;虽然这种疾病大多效果不会太理想。 你们知道什么是焦虑症吗&#xff1f; 很多人当出…

【机会约束、鲁棒优化】机会约束和鲁棒优化研究优化【ccDCOPF】研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

c++算法——枚举法

枚举概念 枚举法是通过计算机速度快的特点&#xff0c;对问题所有可能性进行枚举&#xff0c;从中找到答案&#xff0c;需要利用循环。 例题 1&#xff0c;简单数字谜 题目描述 在□内填上一个合适的相同的数字&#xff0c;使等式“□365283□8256”成立。 输入格式 无 输出…

创客匠人视频号全域增长落地班成功举办

以专业赋能好老师&#xff0c;打造知识付费商业IP&#xff0c;借助视频号布局商业增长第二曲线。 4月15日-16日&#xff0c;泛知识场景数字化服务商创客匠人在厦门举办「视频号全域增长落地班」。 本次大会邀请到创客匠人CEO、中欧EMBA蒋洪波&#xff0c;福布斯环球联盟创新企业…

08 dubbo源码学习_LoadBalance

1. loadBalance的作用2. loadBalance的入口3. loadBalance源码3.1 RandomLoadBalance 加权随机算法3.2 LeastActiveLoadBalance 最小活跃数负载均衡3.3 ConsistentHashLoadBalance 一致性 hash 算法3.4 RoundRobinLoadBalance 加权轮询负载均衡4. loadBalance使用1. loadBalanc…

OpenAI-ChatGPT最新官方接口《安全最佳实例》全网最详细中英文实用指南和教程,助你零基础快速轻松掌握全新技术(十)(附源码)

Safety best practices 安全最佳实践 前言Use our free Moderation API 使用我们的免费审核APIAdversarial testing 对抗性测试Human in the loop (HITL) 人在回路Prompt engineering 快速工程“Know your customer” (KYC) “了解你的客户”Constrain user input and limit ou…

Mysql问题

1. B树和B树区别&#xff0c;mysql为什么不使用B树 区别&#xff1a; 非叶子节点和叶子节点的存储方式不同。B树的非叶子节点既存储键值信息&#xff0c;也存储数据信息&#xff0c;而B树的非叶子节点只存储键值信息。因此&#xff0c;B树的每个节点都包含数据信息&#xff0…

智慧城市基础设施监测解决方案——5G Debian边缘计算机

随着城市化进程的不断加速&#xff0c;智慧城市建设愈发成为未来城市发展的重要方向。智慧城市需要依靠高效、精准的数据采集和传输技术支持城市基础设施的稳定运行和管理。而5G Debian边缘计算机 作为一种高可靠性、高稳定性的计算设备&#xff0c;正逐渐受到智慧城市基础设施…

PMP项目管理|敏捷实施过程的难点痛点及解决方法

1、团队目标或任务不明确 敏捷章程中关于目标的部分--愿景、使命和使命测试 2、团队工作协议不明确 敏捷章程中关于一致性的部分--价值观、原则和工作协议 3、团队环境不明确 敏捷章程中关于环境的部分-边界 承诺资产和前瞻性分析 4、需求不明确 帮助发起人和相关方制定…

1007、1009:与进制问题、输出问题

1007 题目&#xff1a;本题要求计算A/B&#xff0c;其中A是不超过1000位的正整数&#xff0c;B是1位正整数。你需要输出商数Q和余数R&#xff0c;使得A B * Q R成立 思路&#xff1a;对于数字元素拆分&#xff0c;除法的计算方法用代码实现&#xff08;唯一一点就是在输出的…

在外Windows远程连接MongoDB数据库【无公网IP】

文章目录 前言1. 安装数据库2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射2.3 测试随机公网地址远程连接 3. 配置固定TCP端口地址3.1 保留一个固定的公网TCP端口地址3.2 配置固定公网TCP端口地址3.3 测试固定地址公网远程访问 转载自远程内网穿透的文章&#xff1a;公网远…

禁用表单元素:Layui框架下的实践与技巧

引言 在日常的网页开发过程中&#xff0c;有时我们需要禁用表单元素&#xff0c;以防止用户在某些情况下进行输入或更改。在本文中&#xff0c;我们将介绍如何在Layui框架下使用JavaScript禁用表单元素&#xff0c;例如单选按钮&#xff08;radio&#xff09;、下拉列表&#…

Go的ORM框架XORM实现增删改查

XORM XORM 是一个简单而强大的 Go 语言 ORM 框架。官网XORM 更多细节请移步官网&#xff0c;本篇文章主要阐述快速入门&#xff0c;快速实现增删改查的&#xff08;curd的操作来熟悉xorm。 xorm优势&#xff1a; 支持struct和数据库表的灵活映射&#xff0c;并自动同步支持…

超详细的ubuntu安装opencv2.0//test ok

目录 1. 首先确保在Ubuntu上已经安装了cmake和make 1.1 安装make 1.2 安装cmake 2 安装依赖环境 3 下载opencv源码 4 编译源码并安装 4.1 进入opencv源码目录中&#xff0c;新建build文件夹 4.2 进入build文件夹&#xff0c;打开终端使用cmake生成makefile 4.3 安装ope…

15.网络爬虫—selenium验证码破解

网络爬虫—selenium验证码破解 一selenium验证码破解二破解平台打码平台超级鹰文识别基于人工智能的定制化识别平台 —图灵 三英文数字验证码破解selenium破解验证码快捷登录古诗文网 四滑动验证码破解selenium滑动验证码破解网易网盾测试案例 五总结六后记 前言&#xff1a; &…

​影响广告收益的关键因素及其优化方法

​摘要&#xff1a;广告收益是许多企业、网站和应用程序达到一定阶段后&#xff0c;不得不重视的收入来源。然而&#xff0c;广告收益的影响因素却是多种多样的&#xff0c;包括广告受众群体、广告位、广告类型以及广告预算的波动等等。 针对这些影响因素&#xff0c;企业和个人…

Day948.组件化成熟度评估,你的目的地在哪里呢 -系统重构实战

组件化成熟度评估&#xff0c;你的目的地在哪里呢 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于组件化成熟度评估&#xff0c;你的目的地在哪里呢的内容。 一、组件化成熟度模型 组件化成熟度模型可以帮助咱全局去思考当前的现状&#xff0c;并制定更有针对性的…

运算符重载(二):重载赋值

重载赋值运算符 在一个类里面&#xff0c;编译器默认给我们添加了一个赋值运算符的重载&#xff0c;这样我们就能给两个对象之间进行赋值运算 class Role { public:int hp;int mp; } int main() {Role x,y;x.hp100;x.mp200;yx;std::cout<<y.hp<<y.mp; } 像这样&…

SOAP or REST APIs的区别

SOAP和REST是两种不同的Web服务API。 它们之间的主要区别如下: 数据格式 SOAP只支持XML格式,REST支持多种格式,如XML、JSON等。服务定义 SOAP基于WS-*标准,服务定义通过WSDL文件完成。REST不依赖WS-*标准,服务定义主要通过URI完成。通讯协议 SOAP只支持HTTP协议,REST通常支持H…

Spring底层核心原理

文章目录 Spring底层核心原理Bean的生命周期推断构造方法AOP原理Spring事务 Spring底层核心原理 下面这几行代码是一个Spring的入门代码&#xff0c;第一行是通过java配置类 注解的方式创建一个Spring容器&#xff0c;第二行是通过XML配置文件的方式创建一个Spring容器 Annot…