kafka之java客户端实战

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();}
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 从客户端一些属性来认识kafka客户端工作机制

内容更新中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620946.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#中委托的理解

C#中的委托类似于C中的函数指针&#xff0c;是一种引用类型&#xff0c;表示对具有特定参数列表和返回类型的方法的引用。 委托包含两部分&#xff0c;委托的声明和委托的实例化。 委托的声明示例如下&#xff1a; public delegate string printString(string str); 委托的实…

Manjora 中使用idm,linux通用

说明 在Mnajora中的idm需要在wine中运行&#xff0c;idm是一款很不错的下载工具&#xff0c;但是在linux不能直接使用&#xff0c;借助wine也无法使用浏览器的集成插件&#xff0c;在网上偶然发现一款第三方插件能够在linux的浏览器中将链接捕捉到idm中&#xff0c;虽然使用起…

js 什么是外边距重叠怎么解决

外边距重叠&#xff08;Margin Collapsing&#xff09;是指在特定情况下&#xff0c;相邻的块级元素的上下外边距会合并为一个外边距的现象。这种行为可能导致页面布局不符合预期。 外边距重叠通常发生在以下情况下&#xff1a; 相邻兄弟元素&#xff1a;当相邻的两个兄弟元素…

git安装教程 Windows 附安装包链接

Git是一款分布式源代码管理工具(版本控制工具) 。 git的作用 当你需要做一个大工程的时候&#xff0c;文件的管理无疑是非常庞大的工作&#xff0c;因为你需要不断的修改更新文件内容&#xff0c;同时可能还要保留旧版本保证可以复原&#xff0c;这样就需要备份多个版本的文件…

计网期末复习(一)

计网期末复习&#xff08;一&#xff09; – WhiteNights Site 标签&#xff1a;计算机网络 诶&#xff0c;期末。诶&#xff0c;复习。 TCP/IP参考模型的网络层提供的是&#xff1f; 区别于传输层&#xff0c;网络层提供不可靠无连接的数据报服务 当时看到TCP/IP就选了可靠有…

RT-Thread学习(一)简介及基础环境配置

系列文章目录 文章目录 系列文章目录前言简要介绍配置环境修改工作时钟更改ROM空间添加FinSH串口命令提示 前言 之前学习了FreeRTOS&#xff0c;但是一直想深入学习&#xff0c;但是没有人指导&#xff0c;又不知道该如何学习&#xff0c;于是再学习一个操作系统看看情况。 简…

QEMU源码全解析 —— PCI设备模拟(10)

接前一篇文章&#xff1a; 上一回讲到&#xff0c;在SeaBIOS的调用链dopost->maininit->platform_hardware_setup->qemu_platform_setup->pci_setup->pci_bios_map_devices过程中&#xff0c;最后这个函数负责完成PCI设备BAR的设置。 其中包括I/O、MEM以及PREF…

Docker安装Odoo17

Docker安装Odoo 前言所需环境安装步骤登录Odoo 配置数据库 前言 Odoo是一个开源的ERP框架&#xff0c;它提供了一套完整的、可定制的、模块化的企业管理软件解决方案。以下是Odoo的主要特点&#xff1a; 模块化设计&#xff1a;Odoo的各个功能都以模块的形式提供&#xff0c;包…

linux主机的免密登录

实现linux主机之间的相互免密登录 在进行远程登录的时&#xff0c;服务器和主机间进行认证阶段分为&#xff1a; 基于口令认证&#xff08;不安全&#xff0c;易被抓包拦截获取&#xff09; 客户机连接服务器时&#xff0c;服务器将自己的公钥返回给客户机 客户机会将服务器的…

Vue中避免滥用this去读取data中数据

template模板中如何避免 提前处理v-for循环所用的数据&#xff0c;不要在v-for循环中去读取数组、对象类型的数据。在上述template模板中滥用this的例子中可以这样优化。 假设list、arr、obj皆是服务端返回来的数据&#xff0c;且arr和obj没有用到任何模块渲染中&#xff0c;…

机器视觉系统选型-参数—景深

镜头在垂直方向上&#xff0c;能清晰成像的空间距离(清晰成像范围)&#xff0c;称为景深

【java八股文】之分布式系列篇

1、什么是CAP BASE理论 1.1 CAP 一致性: 在分布式环境下&#xff0c;一致性是指数据在多个副本之间是否能够保持一致性的特性&#xff0c;等同于所有的节点访问同一份最新数据的副本。在一致性的需求下&#xff0c;当一个系统在数据一致的状态下执行更新操作后&#xff0c;应该…

CSAPP - 反编译 initialize_bomb()

CSAPP - 保持好奇&#xff0c;反汇编 initialize_bomb() 相比于直接看 bomblab phase_1 的答案&#xff0c;我更想搞懂答案之外涉及的每个函数的反汇编 - 反正是一个实验&#xff0c;代码能复杂到哪里去&#xff1f; 而搞懂这些函数&#xff0c; 无疑对于实际工程中的各种 deb…

nrm使用

为了更方便的切换下包的镜像源&#xff0c;我们可以安装 nrm 这个小工具&#xff0c;利用 nrm 提供的终端命令&#xff0c;可以快速查看和切换下 包的镜像源。 //通过 npm 包管理器&#xff0c;将 nrm 安装为全局可用的工具 npm i nrm -g//查看所有可用的镜像源 nrm ls//将下载…

【现代密码学】笔记 补充7-- CCA安全与认证加密《introduction to modern cryphtography》

【现代密码学】笔记7-- CCA安全与认证加密《introduction to modern cryphtography》 写在最前面7 CCA安全与认证加密 写在最前面 主要在 哈工大密码学课程 张宇老师课件 的基础上学习记录笔记。 内容补充&#xff1a;骆婷老师的PPT 《introduction to modern cryphtography》…

mysql数据库被黑恢复—应用层面delete删除---惜分飞

客户的mysql被人从应用层面攻击,并且删除了一些数据,导致业务无法正常使用,通过底层分析binlog确认类似恢复操作 确认这类的业务破坏是通过delete操作实现的,客户那边不太幸,客户找了多人进行恢复,现场严重破坏,老库被删除,并且还原了历史的备份文件(非故障第一现场),通过底层…

Error: error:0308010C:digital envelope routines::unsupported的解决方案

因为最近安装了pnpm对node版本有要求&#xff0c;升级了node版本是18以后&#xff0c;在运行之前的项目&#xff0c;就跑不起来了&#xff0c;报错如下&#xff1a; Error: error:0308010C:digital envelope routines::unsupported解决方案一&#xff1a; node版本切换到16版…

KEI5许可证没到期,编译却出现Error: C9555E: Failed to check out a license.问题解决

一、编译出现如下报错 二、检查一下许可证 三、许可证在许可日期内&#xff0c;故应该不是许可证的问题 四、检查一下编译器&#xff0c;我用的是这个&#xff0c;这几个编译器的区别其实我不太明白&#xff0c;但我把问题解决是选的这个 五、找到编译器的路径&#xff0c;去复…

Dockerfile的COPY --link

文章目录 总结环境概述“ --link” 是什么引入“ --link”使用“COPY --link”示例什么情况不适用总结参考 注&#xff1a;我做了很多测试&#xff0c;发现不管是否使用 --link &#xff0c;结果貌似都一样。我在网上搜了半天&#xff0c;最后发现&#xff0c;该功能貌似目前被…

计算机网络的常用的网络通信命令(Windows)

ping&#xff1a;它是用来检查网络是否通畅或者网络连接速度的命令。ping命令利用的原理是&#xff1a;网络上的机器都有唯一确定的IP地址&#xff0c;我们给目标IP地址发送一个数据包&#xff0c;对方就要返回一个同样大小的数据包&#xff0c;根据返回的数据包我们可以确定目…