(五)kafka从入门到精通之topic介绍

news/2024/4/15 4:58:38

1、kafka简介

Kafka是一个流行的分布式消息系统,它的核心是一个由多个节点组成的分布式集群。在Kafka中,数据被分割成多个小块,并通过一些复杂的算法在节点之间传递。这些小块被称为Kafka Topic。

2、topic知识

一个Topic是一组具有相同主题的消息。可以将Topic看作是一个数据仓库,在这个仓库中存储着具有相同主题的数据。比如,一个Topic可以存储所有关于“股票”的数据,另一个Topic可以存储所有关于“天气”的数据。

Kafka Topic的设计非常简单,但是它的功能却非常强大。Kafka Topics可以实现数据的发布、订阅和消费。在发布数据时,可以将数据放到一个Topic中,其他节点可以订阅这个Topic,并且获取其中的数据。在订阅数据时,可以将一个Topic的地址放到消费者的地址中,这样消费者就可以获取到该Topic中的数据。

Kafka Topis的数据结构非常特殊,它是一个由多个分区组成的集合。每个分区都是一个独立的数据流,并且可以使用不同的策略来处理数据的分配和复制。这种数据结构可以提高数据的可靠性和安全性,并且可以支持大规模的数据传输。

Kafka Topic的分区结构非常重要,它可以将数据分成多个部分,并且可以使用不同的策略来处理数据的分配和复制。每个分区都有一个唯一的标识符,叫做分区ID。可以使用不同的分区ID来创建多个分区,每个分区可以存储不同的数据。

3、简单使用

在使用Kafka Topics时,需要注意一些事项。首先,要创建一个Topic,并且指定该Topic的主题和相关参数。其次,要创建一些消费者,并且将它们添加到该Topic的订阅列表中。最后,当数据被发布到Topic中时,消费者会自动订阅这个Topic,并且获取其中的数据。

首先,您需要在项目中添加 Kafka 依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>

然后,您需要编写一个生产者,以将消息发布到指定的主题中:

在创建Topic时,可以指定该Topic的分区数和每个分区的大小。分区数表示要将数据分成多少个部分,每个部分可以使用不同的策略来处理数据的分配和复制。每个分区的大小表示每个部分可以存储多少数据。

package com.yinfeng.test.demo.kafka;import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author admin* @date 2023/7/2 19:02* @description*/
public class KafkaProducerDemo {@SneakyThrowspublic static void main(String[] args) {Properties props = new Properties();// Kafka 集群地址props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);// 发送3条消息for (int i = 0; i < 3; i++) {ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);producer.send(record1, (metadata, exception) -> {System.out.println("消息发送成功 topic="+metadata.topic()+", msg=>" + record1.value());});}// kafka异步发送,延时等待执行完成Thread.sleep(5000);}
}

在这里插入图片描述

当数据被发布到Topic中时,可以将数据放到一个Topic中,其他节点可以订阅这个Topic,并且获取其中的数据。订阅一个Topic的过程可以用以下代码表示:

在消费Topic中的数据时,需要指定要消费的主题名称和消费者的地址。消费者的地址包括一个主机名和一个端口号,以及一个唯一的标识符,叫做消费者ID。消费者ID可以使用环境变量来设置,也可以在消费者的地址中直接指定。

package com.yinfeng.test.demo.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author admin* @date 2023/7/2 19:02* @description*/
public class KafkaConsumerDemo {public static void main(String[] args) {Properties props = new Properties();// Kafka 集群地址props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my_group");props.put("auto.offset.reset", "earliest");props.put("enable.auto.commit", "true");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());consumer.subscribe(Collections.singleton("test"));// 循环拉取消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());}}}
}

在这里插入图片描述

在上面的代码中,我们首先创建了一个Kafka集群,然后创建了一个Topic,并且指定了该Topic的分区ID。接着,我们创建了一个Kafka集群,并且指定了该Topic的分区ID。接着,我们创建了一个消费者,并且将该消费者添加到该Topic的订阅列表中。最后,我们使用该消费者来消费该Topic中的数据。

在消费数据时,我们使用了Kafka提供的ConsumerRecords类来获取数据。我们首先使用该类的poll方法来获取一个消费者的数据,然后使用该类的其他方法来对数据进行处理。

在设置消费者的偏移量时,我们使用了Kafka提供的OffsetRequest类来向Kafka集群中提交消费者的偏移量。我们首先创建了一个OffsetRequest对象,然后使用该类的setOffset方法来将该对象设置为要求的偏移量。最后,我们调用该类的commitSync方法来提交该偏移量。不过由于我们设置自动提交,所以这步可以不用操作。

4、注意事项

在使用Kafka Topics时,还需要注意一些其他的事项。

例如,在创建Topic时,可以指定该Topic的备份策略,以确保数据的可靠性和安全性。备份策略包括多种不同的方法,如备份到本地文件、备份到数据库、备份到其他Kafka集群等。

另外,在使用Kafka Topics时,还可以使用Kafka提供的一些API和工具来对Topic进行操作和管理。例如,可以使用Kafka提供的AdminClient来管理Kafka集群中的所有Topic,可以使用Kafka提供的ConsumerGroupClient来管理Kafka集群中的所有ConsumerGroup,可以使用Kafka提供的KafkaConsumer来消费Kafka集群中的数据等。

总之,Kafka Topics是Kafka中非常重要的一个概念,它可以实现数据的发布、订阅和消费。在使用Kafka Topics时,需要注意一些事项,以确保数据的可靠性和安全性。


http://www.ppmy.cn/news/693739.html

相关文章

穷举法解锁华为手机bootloader

理论 理论一次解锁0.025s&#xff0c;穷举要9999999999999999*0.025/60/60/24/365190258751年&#xff0c;建议大家还是用dc-unlocker解锁 shell 脚本&#xff0c;运气好的可以试试 #!/bin/bash> unlock.log succeedfalse code1000000000000000 #adb reboot bootloader w…

双胞胎能互相人脸识别解锁支付软件?那是你不了解点面科技的虹膜识别技术

双胞胎能互相人脸识别解锁支付软件&#xff1f;那是你不了解点面科技的虹膜识别技术 记得有一档综艺节目上&#xff0c;一对双胞胎姐妹花曾经说过一个段子。姐姐吐槽说想要体会独立的感觉&#xff0c;特别是经济独立&#xff0c;因为妹妹天天花她的钱。而妹妹则很无奈的说&…

华为解锁BL

华为手机要怎么查看手机是否需要解锁呢?相信许多机油都不懂自己入手的手机是否需要解锁。而华为手机自华为C8812之后的高通手机均需要先解锁才能够尽兴刷机或获取Root权限的。那么下面我给大家分享一下华为手机查看是否需要解锁的查看教程。 方法/步骤 第一种方法&#xff1a;…

华为解锁码申请

我们刚拿到华为手机时&#xff0c;手机默认是对Bootloader进行锁定的&#xff0c;也就是说我们不能对手机进行ROOT&#xff0c;或者刷第三方Recovery等一系列操作。如需进行以上操作&#xff0c;必须对bootloader进行解锁。而目前各论坛&#xff0c;各网站针对华为手机解锁的介…

虹膜识别与虹膜定位(续)算法实现

在http://blog.csdn.net/piaoxuezhong/article/details/77966132中大致总结的虹膜识别与虹膜定位的原理&#xff0c;本篇结合现有的方法&#xff0c;实例测试实现&#xff0c;做下记录&#xff1a; 数据库&#xff1a; 中科院虹膜识别数据库 下载及说明请参见&#xff1a;htt…

华为解锁码

前言 华为手机ROOT偶需解锁码。 获取 进入按提示输入即可。

华为mate手机从解锁到root成功全步骤

警告 请保持电量充足&#xff0c;不然小心变砖。 解锁手机会恢复出厂设置&#xff0c;原因未知&#xff08;伤心&#xff0c;不想查了&#xff09;&#xff0c;请需要解锁的diy爱好者&#xff0c;自行备份数据。 一、安装adb驱动 下载安装adb驱动&#xff08;已分享&#xff…

什么是虹膜识别,虹膜识别有哪些优缺点?

虹膜识别&#xff1a;古老的识别方法几无可能复制修改 即使对于生长在二十世纪末和二十一世纪初的年轻人来说&#xff0c;关于虹膜识别也是一个新鲜名词&#xff0c;很多人是在近两年才听说&#xff0c;智能手机在普及虹膜识别概念上功不可没&#xff0c;富士通设计制造的ARRO…

动动嘴就可以解锁?来看下华为最新的技术专利

近些年来&#xff0c;随着智能手机技术的不断进步&#xff0c;手机的解锁方式也是越来越五花八门&#xff0c;从最早的密码解锁、画图解锁到后来的指纹解锁、人脸识别解锁、到最先进的虹膜解锁&#xff0c;随着解锁方式的增多&#xff0c;智能手机的安全性也大幅提升。 但&…

人体密码学之未来——虹膜识别

密码技术 无论是早期文字或者数码式的密码&#xff0c;还是随着信息技术的发展形成了现在语音、图像、数据式的密码&#xff0c;密码技术一直以来都是被频繁使用的。 伴随着保密场所对安全、精准、便捷的要求越来越高&#xff0c;从IC&#xff08;或ID&#xff09;卡开锁到指纹…

【虹膜识别】虹膜识别技术概述

虹膜识别技术概述 本文主要参考&#xff1a; 刘晓敏. 虹膜识别中预处理算法的研究与实现[D]. 2009. 杨光磊. 非理想场景下的虹膜识别方法研究[D]. 2016. 1.什么是虹膜 人的眼睛主要由巩膜、虹膜和瞳孔三个部分组成&#xff0c;如下图所示&#xff1a; 虹膜是瞳孔和巩膜之间的…

华为 emui 刷机解锁及回锁教程

华为 emui 刷机解锁及回锁教程 刷第三方ROM&#xff0c;必须解锁bootloader获取权限 一&#xff0c;申请解锁码 &#xff08;1&#xff09;申请解锁码链接 http://www.emui.com/plugin.php?idunlock &#xff08;2&#xff09;申请解锁码办法 同意协议&#xff0c;选择智能…

虹膜识别与虹膜定位

这篇文章 http://www.eeworld.com.cn/qrs/article_2016092030381.html 对虹膜识别的发展和原理进行了介绍&#xff0c;这里进行简要概述&#xff1a; 虹膜识别的优势&#xff1a; 1. 唯一性&#xff1a;虹膜的形成主要是由胚胎发育环境的随机因素所决定&#xff0c;虹膜纹理中…

眼睛中的密码-虹膜识别

什么是虹膜识别&#xff1f; 首先&#xff0c;最明显的缺陷当属眼盲患者、尤其是眼球外伤患者是不能适用虹膜识别技术的。与此类似的是佩戴眼镜、佩戴美瞳等隐性眼镜的情况下&#xff0c;同样也不能进行虹膜信息的录入。在录入虹膜信息以及使用虹膜识别时还会受到环境光的影响…

获取华为解锁码的思路

前言 华为手机在刷入第三方rom需要首先解锁手机&#xff0c;而获取解锁码必须要在华为官网进行&#xff0c;并且有14天限制&#xff0c;这无疑给某些特殊需求的用户和rom定制厂商设置了一个很大难题&#xff0c;也为华为保护自身定制软件提供了有利因素&#xff0c;因此这里给…

C++primer(第五版)第八章(IO库)

8.1 IO库 上表中以w开头的类型和函数是C标准库为了支持使用宽字符的语言而定义的一组类型和对象来操纵wchar_t类型的数据.(然而我没有遇到过) 8.1.1 IO对象无拷贝或赋值 IO对象不能拷贝或赋值,通常用引用方式传递和返回流,由于读写一个IO对象回改变其状态,因此传递和返回的引…

剑指YOLOv8改进VariFocalNet系列03:即插即用|最新改进VariFocal损失函数,全面提升密集场景下的目标检测,提升YOLOv8检测精度

剑指YOLOv8改进Loss系列:最新改进VariFocalNet损失函数,全面提升密集场景下的目标检测,提升YOLOv8检测精度 💡CSDN芒果汁没有芒果🥭:YOLOv8 最新首发创新点改进源代码!! 💡🚀🚀🚀本博客 改进源代码改进 适用于 YOLOv8 按步骤操作改进代码即可 💡论文地址…

判断一个人有没有领导力,就看这4点

作者| Mr.K 编辑| Emma 来源| 技术领导力(ID&#xff1a;jishulingdaoli) 最近K哥下面的团队有一个主管的空缺&#xff0c;我准备提拔小L。这让团队中很多人感到很意外&#xff0c;因为小L刚进团队一年&#xff0c;年纪也只有26岁&#xff0c;团队中有很多人资历比他老&#…

恢复出厂设置后itms注册失败_电信光纤故障OLT注册正常ITMS注册失败是怎么个情况...

2020-03-04阅读(244) 忘记opoa9m锁频数字密码后,解锁方案主要分为以下两种情况:1.若oppoa9m手机已打开查找手机功能:方案一:首先进行恢复原厂设置,长按手机关机键10秒进行强制关机。手机关机后同时按下关机键与音量下键,在所打开的界面中选择语言“简体中文”键后, 2020…

Flutter TextField 输入框 简单使用

创建方式一&#xff1a; ///用于文本输入框 TextEditingController controller new TextEditingController();/// 设置TextField中显示的内容void setEditeInputTextFunction(String flagText) {controller .text flagText;}/// 清除TextField中显示的内容void clearEditeIn…
最新文章