@KafkaListener指定kafka集群

news/2024/2/20 16:00:12

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}

http://www.ppmy.cn/news/1302061.html

相关文章

Qt QWidget窗口基类

文章目录 1 QWidget介绍2 如何显示 QWidget窗口2.1 新建基于QWidget的窗口类2.2 再添加一个QWidget窗口类2.3 显示新添加的 QWidget窗口 3 常用的属性和方法3.1 窗口位置3.2 窗口大小3.3 窗口标题3.4 窗口图标3.5 资源文件 4 实例 1 QWidget介绍 Qt 中的常用控件&#xff0c;比…

Linux中断 -- 中断应答、嵌套、

接上文&#xff0c;本文继续介绍Linux软件部分逻辑。 参考内核版本&#xff1a;kernel-4.19 目录 1.中断信号在各级中断控制器中的应答 2.supports_deactivate_key意义 3.中断嵌套 1.中断信号在各级中断控制器中的应答 本章主要从内核软件层面来看各中断控制器对中断信号处…

国际版WPS Office 18.6.1

【应用名称】&#xff1a;WPS Office 【适用平台】&#xff1a;#Android 【软件标签】&#xff1a;#WPS 【应用版本】&#xff1a;18.6.1 【应用大小】&#xff1a;160MB 【软件说明】&#xff1a;软件日常更新。WPS Office是使用人数最多的移动办公软件。独有手机阅读模式…

Mysql的in与exits

Mysql的in与exits IN和EXISTS是MySQL中用于子查询的两种不同的条件操作符。它们在使用和实现上有一些区别。 IN 操作符&#xff1a; IN操作符用于判断一个值是否在一个集合内。它可以用于子查询中&#xff0c;检查主查询的某一列是否在子查询返回的结果集中。 SELECT colum…

【扩散模型】10、ControlNet | 用图像控制图像的生成(ICCV2023)

论文&#xff1a;Adding Conditional Control to Text-to-Image Diffusion Models 代码&#xff1a;https://github.com/lllyasviel/ControlNet 出处&#xff1a;ICCV2023 Best Paper | 斯坦福 时间&#xff1a;2023.02 一、背景 文本到图像的生成尽管已经有很好的效果&…

【liunx】线程池+单例模式+STL,智能指针和线程安全+其他常见的各种锁+读者写者问题

线程池单例模式STL,智能指针和线程安全其他常见的各种锁读者写者问题 1.线程池2.线程安全的单例模式3.STL,智能指针和线程安全4.其他常见的各种锁4.读者写者问题 喜欢的点赞&#xff0c;收藏&#xff0c;关注一下把&#xff01; 1.线程池 目前我们学了挂起等待锁、条件变量、信…

(每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理第5章 信息系统工程(三)

博主2023年11月通过了信息系统项目管理的考试&#xff0c;考试过程中发现考试的内容全部是教材中的内容&#xff0c;非常符合我学习的思路&#xff0c;因此博主想通过该平台把自己学习过程中的经验和教材博主认为重要的知识点分享给大家&#xff0c;希望更多的人能够通过考试&a…

数字化时代,VR全景展示如何让用户一窥全貌?

数字化时代&#xff0c;VR全景展示为各行各业提供了无限的可能性。随着VR全景技术的逐步普及&#xff0c;VR全景展示以其独特的呈现方式和新颖十足的交互体验&#xff0c;正在不断改变着人们对于展示宣传的理解。 传统的展示方式&#xff0c;通常需要将产品、图文、品牌等元素集…

前端开发中需要注意的CSS命名规则以及书写顺序

1、CSS的命名——BEM规则&#xff1a; CSS命名一般是用 BEM 规则命名的。它背后的想法是将用户界面划分为独立的块。 BEM的意思就是B模块(block)、E元素(element)、M修饰符(modifier)&#xff0c; 即&#xff1a;[block]__[element]--[modifier]。 模块和子元素之间用两个下划…

问答机器人prompt

def build_prompt(prompt_template, **kwargs): ‘’‘将 Prompt 模板赋值’‘’ prompt prompt_template for k, v in kwargs.items(): if isinstance(v, str): val v elif isinstance(v, list) and all(isinstance(elem, str) for elem in v): val ‘\n’.join(v) else: v…

缓存学习实战篇

缓存练习题&#xff08;用户查询操作&#xff09; public List<ShopType> queryAllType() throws JsonProcessingException {//从缓存中查数据String shopTypeJson stringRedisTemplate.opsForValue().get("cache:shopType");//如果缓存命中&#xff0c;if (S…

Python字符串的编码和解码

不同计算机之间进行数据传输&#xff0c;实际上传输的是二进制数据。 一.字符串的编码 将str类型转换成bytes类型&#xff0c;需要用到字符串的encode&#xff08;&#xff09;方法 语法格式&#xff1a; Str.encode(encoding’utf-8’, Errors’strict/ignore/replace’) …

零基础也可以探索 PyTorch 中的上采样与下采样技术

目录 torch.nn子模块Vision Layers详解 nn.PixelShuffle 用法与用途 使用技巧 注意事项 参数 示例代码 nn.PixelUnshuffle 用法与用途 使用技巧 注意事项 参数 示例代码 nn.Upsample 用法与用途 使用技巧 注意事项 参数 示例代码 nn.UpsamplingNearest2d …

SpringMVC-获取请求参数

简介 用户输入信息后&#xff0c;如果想要得到用户输入的内容 , springMVC 应该如何做呢? 本次课讲解下再springmvc中获取请求参数及中文乱码问题 通过servletAPI获取 讲HttpServletRequest作为控制器方法的形参,此时HttpServletRequest类型的参数表示封装了当前请求的请求报…

sqlite3 jdbc 只读模式

sqlite3 jdbc 只读 无效方法: “jdbc:sqlite:/bal/work_home/fn.db?readonlytrue” 导致 fn.db?readonlytrue 会被当成是数据库文件名 “jdbc:sqlite:/bal/work_home/fn.db?jdbc.explicit_readonlytrue” 参考了pragmaReadOnly sqlite3 jdbc 只读 有效方法 举例 package…

网络编程之Socket

网络编程之Socket 目录 什么是Socket TCP服务端流程 TCP客户端流程 UDP服务端流程 UDP客户端流程 什么是Socket Socket是位于应用层与传输层之间的一个抽象层&#xff0c;可用于不同的网络协议&#xff08;如TCP、UDP等&#xff09;它允许应用程序创建一个与网络上的其…

【第二课课后作业】书生·浦语大模型实战营-轻松玩转书生·浦语大模型趣味Demo

目录 轻松玩转书生浦语大模型趣味Demo课后作业1. 基础作业1.1 使用 InternLM-Chat-7B 模型生成 300 字的小故事&#xff1a;1.2 熟悉 hugging face 下载功能&#xff0c;使用 huggingface_hub python 包&#xff0c;下载 InternLM-20B 的 config.json 文件到本地 2. 进阶作业2.…

第二次面试总结 - 宏汉科技 - Java后端开发

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对专栏 “本人真实面经” 很感兴趣o (ˉ▽ˉ&#xff1b;) 专栏 —— 本人真实面经&#xff0c;更多真实面试经验&#xff0c;中大厂面试总结等您挖掘 目录 总结 (非详细) 面试内容(提问内容) - 带答案…

2024年第十届控制、自动化与机器人国际会议(ICCAR 2024)即将召开!

2024年4月27~29日 新加披 会议官网&#xff1a;10th-ICCAR 2024https://iccar.org/index.html 第十届控制、自动化和机器人国际会议将于2024年4月27-29日在新加坡举办。本次会议由新加坡电子学会&#xff0c;IEEE机器人和自动控制协会和IEEE联合主办&#xff0c;并得到北京航空…

准备好迎接新兴的汽车雷达卫星架构了吗?(TI文档)

引言 随着全球新车评估计划的安全等级和法规对主动安全功能的要求越来越严格&#xff0c;安全是当今车辆的一个不容置疑的特征。全球汽车制造商正在满足这些安全要求&#xff0c;并通过不断增强车辆内的高级驾驶辅助系统(ADAS)功能&#xff0c;包括自动紧急制动(AEB)、自适应巡…
最新文章