kafka之java客户端实战

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);}//消息处理完才停止发送者。producer.close();}
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 从客户端一些属性来认识kafka客户端工作机制

内容更新中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620946.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Manjora 中使用idm,linux通用

说明 在Mnajora中的idm需要在wine中运行&#xff0c;idm是一款很不错的下载工具&#xff0c;但是在linux不能直接使用&#xff0c;借助wine也无法使用浏览器的集成插件&#xff0c;在网上偶然发现一款第三方插件能够在linux的浏览器中将链接捕捉到idm中&#xff0c;虽然使用起…

git安装教程 Windows 附安装包链接

Git是一款分布式源代码管理工具(版本控制工具) 。 git的作用 当你需要做一个大工程的时候&#xff0c;文件的管理无疑是非常庞大的工作&#xff0c;因为你需要不断的修改更新文件内容&#xff0c;同时可能还要保留旧版本保证可以复原&#xff0c;这样就需要备份多个版本的文件…

计网期末复习(一)

计网期末复习&#xff08;一&#xff09; – WhiteNights Site 标签&#xff1a;计算机网络 诶&#xff0c;期末。诶&#xff0c;复习。 TCP/IP参考模型的网络层提供的是&#xff1f; 区别于传输层&#xff0c;网络层提供不可靠无连接的数据报服务 当时看到TCP/IP就选了可靠有…

RT-Thread学习(一)简介及基础环境配置

系列文章目录 文章目录 系列文章目录前言简要介绍配置环境修改工作时钟更改ROM空间添加FinSH串口命令提示 前言 之前学习了FreeRTOS&#xff0c;但是一直想深入学习&#xff0c;但是没有人指导&#xff0c;又不知道该如何学习&#xff0c;于是再学习一个操作系统看看情况。 简…

Docker安装Odoo17

Docker安装Odoo 前言所需环境安装步骤登录Odoo 配置数据库 前言 Odoo是一个开源的ERP框架&#xff0c;它提供了一套完整的、可定制的、模块化的企业管理软件解决方案。以下是Odoo的主要特点&#xff1a; 模块化设计&#xff1a;Odoo的各个功能都以模块的形式提供&#xff0c;包…

机器视觉系统选型-参数—景深

镜头在垂直方向上&#xff0c;能清晰成像的空间距离(清晰成像范围)&#xff0c;称为景深

【现代密码学】笔记 补充7-- CCA安全与认证加密《introduction to modern cryphtography》

【现代密码学】笔记7-- CCA安全与认证加密《introduction to modern cryphtography》 写在最前面7 CCA安全与认证加密 写在最前面 主要在 哈工大密码学课程 张宇老师课件 的基础上学习记录笔记。 内容补充&#xff1a;骆婷老师的PPT 《introduction to modern cryphtography》…

mysql数据库被黑恢复—应用层面delete删除---惜分飞

客户的mysql被人从应用层面攻击,并且删除了一些数据,导致业务无法正常使用,通过底层分析binlog确认类似恢复操作 确认这类的业务破坏是通过delete操作实现的,客户那边不太幸,客户找了多人进行恢复,现场严重破坏,老库被删除,并且还原了历史的备份文件(非故障第一现场),通过底层…

Error: error:0308010C:digital envelope routines::unsupported的解决方案

因为最近安装了pnpm对node版本有要求&#xff0c;升级了node版本是18以后&#xff0c;在运行之前的项目&#xff0c;就跑不起来了&#xff0c;报错如下&#xff1a; Error: error:0308010C:digital envelope routines::unsupported解决方案一&#xff1a; node版本切换到16版…

KEI5许可证没到期,编译却出现Error: C9555E: Failed to check out a license.问题解决

一、编译出现如下报错 二、检查一下许可证 三、许可证在许可日期内&#xff0c;故应该不是许可证的问题 四、检查一下编译器&#xff0c;我用的是这个&#xff0c;这几个编译器的区别其实我不太明白&#xff0c;但我把问题解决是选的这个 五、找到编译器的路径&#xff0c;去复…

Dockerfile的COPY --link

文章目录 总结环境概述“ --link” 是什么引入“ --link”使用“COPY --link”示例什么情况不适用总结参考 注&#xff1a;我做了很多测试&#xff0c;发现不管是否使用 --link &#xff0c;结果貌似都一样。我在网上搜了半天&#xff0c;最后发现&#xff0c;该功能貌似目前被…

SpringCloud:Ribbon

文章目录 Ribbon快速入门Ribbon负载均衡算法常见的负载均衡算法更改算法规则修改配置 饥饿加载 Ribbon ribbon是一个客户端负载均衡器&#xff0c;会从注册中心拉取可用服务&#xff0c;当客户端需要获取服务请求时&#xff0c;ribbon能够解析服务地址并实现负载均衡 快速入门 …

理解TCP/IP协议

一、协议 在计算机网络与信息通讯领域里&#xff0c;人们经常提及 “协议” 一词。互联网中常用的协议有HTTP、TCP、IP等。 协议的必要性 简单来说&#xff0c;协议就是计算机与计算机之间通过网络通信时&#xff0c;事先达成的一种 “约定”。这种“约定”使不同厂商的设备…

Linux工具-搭建文件服务器

当我们使用linux系统作为开发环境时&#xff0c;经常需要在Linux系统之间、Linux和Windows之间传输文件。 对少量文件进行传输时&#xff0c;可以使用scp工具在两台主机之间实现文件传输&#xff1a; rootubuntu:~$ ssh --help unknown option -- - usage: ssh [-46AaCfGgKkMN…

EndNote快速上手

前言&#xff1a;用EndNote主要就是为了方便管理文章引用的文献&#xff0c;所以本篇就是针对EndNote在文章中引用文献需要的技巧&#xff0c;然后本文用的是EndNoteX9。 EndNote快速上手 创建文献资料库创建文献分组导入文献手动输入文件导入在线搜索 修改文献信息去重文献删除…

详解Java之Spring框架中事务管理的艺术

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;咱们今天聊聊Spring框架中的事务管理。不管是开发小型应用还是大型企业级应用&#xff0c;事务管理都是个不可避免的话题。那么&#xff0c;为什么事务管理这么重要呢&#xff1f;假设在银行系统中转账时&#x…

06-微服务OpenFeigh和Sentinel持久化

一、OpenFeign基础应用 1.1 概念 OpenFeign是一种声明式、模板化的HTTP客户端。在Spring Cloud中使用OpenFeign&#xff0c;可以做到使用HTTP请求访问远程服务&#xff0c;就像调用本地方法一样的&#xff0c;开发者完全感知不到这是在调用远程方法&#xff0c;更感知不到在访…

VITS(Conditional Variational Autoencoder with Adversarial Learning)论文解读及实现(一)

此篇为VITS论文解读第一部份 论文地址Conditional Variational Autoencoder with Adversarial Learning for End-to-End Text-to-Speech模型使用了VAE,GAN,FLOW以及transorflomer(文本处理有用到)&#xff0c;即除了未diffusion模型&#xff0c;将生成式模型都融入进来了&#…

064:vue中一维数组的全选、全不选、反选(图文示例)

第061个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…

DHCP中继【新华三】

理论【DHCP服务器可以对其直连的网段中的pc&#xff0c;分配其IP地址等服务&#xff0c;但是&#xff0c;对于跨网段进行分配IP地址&#xff0c;需要中间有DHCP中继进行传达&#xff0c;由DHCP中继指定DHCP服务器的位置&#xff0c;可以很好的对其跨网段分配IP地址起到指引的作…