Kafka-代码示例

一、构建开发环境

File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency>
</dependencies>

 加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic

kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092

三、编写生产者

kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了

package org.example.kafkaStudy;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;public class KafkaProducerDemo {public static void main(String[] args) {try{//topic名称String topicName = "kafka-study";//broker列表String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";//向topic打多少数据int numRecords = 10000;//是否异步推送数据boolean isAsync = true;int key = 0;int sentRecords = 0;//创建生产者KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);//判断是否达到生产要求while (sentRecords < numRecords) {if (isAsync) {//异步推送asyncSend(producer,topicName, key, "test" + key,sentRecords);} else {//同步推送syncSend(producer,topicName, key, "test" + key,sentRecords);}key++;sentRecords++;}producer.close();} catch (Throwable e) {e.printStackTrace();}}private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)throws ExecutionException, InterruptedException {try {// 发送记录,然后调用get,这会阻止等待来自broker的ackRecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();Utils.maybePrintRecord(sentRecords, key, value, metadata);return metadata;} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException| OutOfOrderSequenceException | SerializationException e) {Utils.printErr(e.getMessage());} catch (KafkaException e) {Utils.printErr(e.getMessage());}return null;}private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {//异步发送记录,设置一个回调以通知结果。//请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));}private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {Properties props = new Properties();// 生产者连接到broker需要引导服务器配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 设置序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);if (transactionTimeoutMs > 0) {// 事务协调器主动中止正在进行的事务之前的最长时间props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);}if (transactionalId != null) {// 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);}// 在分区级别启用重复保护props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);return new KafkaProducer<>(props);}static class ProducerCallback implements Callback {private final int key;private final int sentRecords;private final String value;public ProducerCallback(int key, String value,int sentRecords) {this.key = key;this.sentRecords = sentRecords;this.value = value;}/*** 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,* 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。** @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。* @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。*/public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {Utils.printErr(exception.getMessage());if (!(exception instanceof RetriableException)) {// 我们无法从这些异常中恢复过来}} else {Utils.maybePrintRecord(sentRecords, key, value, metadata);}}}
}

四、编写消费者

package org.example.kafkaStudy;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;public class KafkaConsumerDemo {public static void main(String[] args) {//topic名称String topicName = "kafka-study";//组名称String groupName = "my-group-1";//broker列表String bootstrapServers =  "cdh1:9092,cdh2:9092,cdh3:9092";//向topci打多少数据int numRecords = 10000;int remainingRecords = 10000;// 消费来自 topic = kafka-study 的数据KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);//订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知consumer.subscribe(singleton(topicName));Utils.printOut("Subscribed to %s", topicName);while (remainingRecords > 0) {try {// 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。// 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<Integer, String> record : records) {Utils.maybePrintRecord(numRecords, record);}remainingRecords -= records.count();} catch (AuthorizationException | UnsupportedVersionExceptione) {// 我们无法从这些异常中恢复过来Utils.printErr(e.getMessage());} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {// 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效Utils.printOut("Invalid or no offset found, using latest");consumer.seekToEnd(e.partitions());consumer.commitSync();} catch (KafkaException e) {// 记录异常并尝试继续Utils.printErr(e.getMessage());}}consumer.close();Utils.printOut("Fetched %d records", numRecords - remainingRecords);}private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,String groupId , boolean readCommitted) {Properties props = new Properties();// 消费者连接到broker需要引导服务器配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 当我们使用订阅(topic)进行组管理时,需要消费者groupIdprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//设置静态成员资格以提高可用性(例如滚动重启)
//        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));//启用EOS时禁用自动提交,因为偏移量与事务一起提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");//读取数据用到的反序列化器,需要和生产者对应props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);if (readCommitted) {// 跳过正在进行和已中止的事务props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");}// 在偏移无效或没有偏移的情况下设置重置偏移策略props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return new KafkaConsumer<>(props);}
}

五、运行程序

生产者日志打印

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)

......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2)

.......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)

我们再次用命令看下每个分区的offset

消费者日志打印

main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records

六、问题说明

从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?

在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。

消费者也是从brokers一批一批的拉取数据来消费的

我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/56730.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习 基本函数01

np.dot 是 NumPy 库中的一个函数&#xff0c;用于计算两个数组的点积&#xff08;也称为内积或数量积&#xff09;。点积是两个向量的对应元素乘积之和。 np.random.normal 是 NumPy 库中的一个函数&#xff0c;用于生成符合正态分布&#xff08;也称为高斯分布&#xff09;的…

项目管理软件中这6个小技巧帮助项目经理同时管理多个项目

在网上看到一个数据&#xff0c;只有15%的项目经理一次只需要负责一个项目&#xff0c;其他的项目经理都需要同时负责多个项目&#xff0c;甚至有15%的项目经理一次需要负责10个以上的项目。 我在工作中&#xff0c;也只有很少很少的时间里&#xff0c;是一次性只负责一个项目…

目标检测——yolov5-3.1的环境搭建和运行

第一步&#xff1a;安装anaconda环境&#xff0c;并且配置好cuda&#xff0c;安装需要的基本包 查看对应cuda版本&#xff0c;后续下载cudatoolkit需要对应版本 nvcc -V 第二步&#xff1a;创建虚拟环境&#xff0c;激活环境&#xff0c;安装所需的包 conda create -n yolo…

Visual studio 下载安装

1&#xff0c;Visual stutdio 网址 下载 Visual Studio Tools - 免费安装 Windows、Mac、Linux 2&#xff0c;下划页面&#xff0c;点击 较早的下载 3&#xff0c;选择对应的版本进行下载

《深度学习》YOLO v1网络架构 、损失值、NMS极大值抑制

目录 一、Yolo系列v1 1、核心思想 2、示例 3、流程图解析 二、YOLO系列v1损失函数 1、位置误差 2、置信度误差 3、类别概率损失 三、NMS非极大值抑制 1、概念 2、步骤 四、YOLO v1优缺点 1、优点 1&#xff09;速度快 2&#xff09;端到端 3&#xff09;多尺度…

docker 可用镜像服务地址(2024.10.25亲测可用)

1.错误 Error response from daemon: Get “https://registry-1.docker.io/v2/” 原因&#xff1a;镜像服务器地址不可用。 2.可用地址 编辑daemon.json&#xff1a; vi /etc/docker/daemon.json内容修改如下&#xff1a; {"registry-mirrors": ["https://…

光伏业务管理软件:提升企业管理效率的利器

一、优化业务流程 光伏业务管理软件能够对企业的各项业务流程进行全面梳理和优化。从项目前期的规划设计、设备采购&#xff0c;到项目建设中的施工管理、质量控制&#xff0c;再到项目后期的运维服务&#xff0c;软件都可以进行有效的跟踪和管理。 通过规范业务流程&#xf…

银行客户贷款行为数据挖掘与分析

#1024程序员节 | 征文# 在新时代下&#xff0c;消费者的需求结构、内容与方式发生巨大改变&#xff0c;企业要想获取更多竞争优势&#xff0c;需要借助大数据技术持续创新。本文分析了传统商业银行面临的挑战&#xff0c;并基于knn、逻辑回归、人工神经网络三种算法&#xff0…

Spring Boot框架下中小企业设备管理系统开发

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理中小企业设备管理系统的相关信息成为必然。…

新手入门之高级maven

文章目录 前言一、分模块设计与开发Maven 分模块设计的优势Maven 分模块设计的基本结构Maven 分模块项目的构建 二、继承与聚合三种打包方式&#xff1a;Maven 父模块和子模块的关系Maven 中的版本锁定1.<dependencyManagement> 标签主要特点&#xff1a; 2.使用 <pro…

刷题 - 图论

1 | bfs/dfs | 网格染色 200. 岛屿数量 访问到马上就染色&#xff08;将visited标为 true)auto [cur_x, cur_y] que.front(); 结构化绑定&#xff08;C17&#xff09;也可以不使用 visited数组&#xff0c;直接修改原始数组时间复杂度: O(n * m)&#xff0c;最多将 visited 数…

基于GPT的智能客服落地实践

&#x1f4cd;前言 在日常生活中&#xff0c;「客服」这个角色几乎贯穿着我们生活的方方面面。比如&#xff0c;淘宝买东西时&#xff0c;需要客服帮你解答疑惑。快递丢失时&#xff0c;需要客服帮忙找回。报名参加培训课程时&#xff0c;需要客服帮忙解答更适合的课程…… 基…

重构商业生态:DApp创新玩法与盈利模式的深度剖析

随着区块链技术的发展&#xff0c;DApp&#xff08;去中心化应用&#xff09;正在从实验走向成熟。DApp以去中心化、透明性和不可篡改性为基础&#xff0c;结合智能合约&#xff0c;逐步改变传统商业运作模式&#xff0c;创造新的市场生态。本文将从DApp的独特优势、创新玩法和…

找不到包的老版本???scikit-learn,numpy,scipy等等!!

废话不多说 直接上链接了&#xff1a; https://pypi.tuna.tsinghua.edu.cn/simple/https://pypi.tuna.tsinghua.edu.cn/simple/https://pypi.tuna.tsinghua.edu.cn/simple/xxx/ 后面的这个xxx就是包的名字 大家需要什么包的版本&#xff0c;直接输进去就可以啦 举个栗子&#…

【汇编语言】第一个程序(一)—— 一个源程序从写出到执行的过程

文章目录 前言1. 第一步&#xff1a;编写汇编源程序2. 第二步&#xff1a;对源程序进行编译连接3. 第三步&#xff1a;执行可执行文件中的程序结语 前言 &#x1f4cc; 汇编语言是很多相关课程&#xff08;如数据结构、操作系统、微机原理&#xff09;的重要基础。但仅仅从课程…

9. JSON RPC 服务

① JSON RPC 是一种基于 JSON 格式的轻量级的 RPC 协议标准,易于使用和阅读。 ② 在 Hyperf 里由 hyperf/json-rpc 组件来实现,可自定义基于 HTTP 协议来传输,或直接基于 TCP 协议来传输。 一、服务中心 目前 Hyperf 仅支持两种服务中心的组件支持: consul、nacosconsul 安…

了解 .NET 8 中的定时任务或后台服务:IHostedService 和 BackgroundService

IHostedService.NET 8 引入了使用和管理后台任务的强大功能BackgroundService。这些服务使长时间运行的操作&#xff08;例如计划任务、后台处理和定期维护任务&#xff09;可以无缝集成到您的应用程序中。本文探讨了这些新功能&#xff0c;并提供了实际示例来帮助您入门。您可…

Visual Studio配置tinyfiledialogs

下载地址&#xff1a;github下载链接 将下载的文件解压后&#xff0c;打开VS添加现有项 将.c文件添加进去 然后将tinyfiledialogs.h文件路径添加到包含目录 使用时包含头文件即可&#xff1a; #include <tinyfiledialogs.h>

上海亚商投顾:沪指缩量震荡 风电、传媒股集体走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 市场全天缩量震荡&#xff0c;三大指数集体收涨&#xff0c;北证50则跌超7%&#xff0c;超80只北交所个股跌逾…

一文搞定图

图 图 常见类型与术语 图的表示 邻接矩阵 邻接表 基础操作 基于邻接矩阵的实现 基于邻接表的实现 遍历 广度优先 深度优先 图 图 是一种非线性数据结构&#xff0c;由 顶点 和 边 组成。 相较于线性关系的链表和分治关系的树&#xff0c;网络关系的图自由度更高 常见…