java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

本次的记录内容包括:

1.Java调用生产者APi流程

2.Kafka生产者Api的使用及说明

3.Kafka消费者Api的使用及说明

4.Kafka消费者自动提交Offset和手动提交Offset

5.自定义生产者的拦截器,分区器

那么接下来我就带大家熟悉以上Kafka的知识说明

1.Java调用生产者APi流程

首先上一张从网上找的简单的图,来描述一下生产者的生产流程。这里这个的图描述的不是非常精确,稍微有点问题的地方就是省略了拦截器内容,这块的内容在实际场景中也经常使用

420eb1162d5bde5b19e58c796f6cde69.png

那么从图中我们可以看到。生产者通过调用api的Send方法开始进行一些列生产控制操作,首先进入的是一个叫序列化器的处理结构(这里就先按图来讲了--实际第一步会先经过拦截器),那么这一步主要的操作就是序列化相关数据,保证数据传输的稳定准确性,个人理解需要序列化的原因是因为kafka是磁盘文件写消息,序列化后悔经过分区器,主要就是我们上篇讲过的关于如何生产消息分区的策略,主要有三种,1.指定分区,2根据key的hash取余有效分区数分区,3初始化整数,轮训分区。具体细节请参考上一篇文章(https://www.cnblogs.com/hnusthuyanhua/p/12355216.html)。经过分区后消息将会发送到指定的分区供消费者消费。

那么从图中我们还可以看到有一个RecordMetaData的存在,这又是干什么的呢?这里就又设计到另一个知识点了。由于在网上未找打相关描述图,我这里就粗略说明一下

大致的kafka生产者程序一般是有两类线程进行,一个是主线程,另一个是生产消息的线程,他们质检有一个RecoderMetaData作为消息存储缓存,同时也是线程共享变量,当主线程不断生产消息,本质上就是不断累积RecoderMetaData的缓存值,当缓存值达到限定时,生产者线程开始讲数据发送至kafka.。那么kafka生产者的一个流程大概就是这样了

2.Kafka生产者Api的使用及说明

大致流程:配置kafka property信息---构建生产者---构建消息---发送消息---关闭资源

@Slf4j

public class KafkaProduce {

public static void main(String[] args) {

Properties properties = new Properties();

//第一步 初始化化kafka服务配置Properties--具体配置可以抽到实际的Property配置文件

//设备地址

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");

//ack

properties.put(ProducerConfig.ACKS_CONFIG, "all");

//序列化器

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

//构建生产者

Producer producer = new KafkaProducer(properties);

for(int i = 0;i< 100;i++)

{

String msg = "------Message " + i;

//构建生产记录

//第一种方式指定Toppic

ProducerRecord producerRecord=new ProducerRecord("kafkatest",msg);

//send方法分为有返回值和无返回值两种

// 无返回值简单发送消息

//producer.send(producerRecord);

//有返回值的在发送消息确认后返回一个Callback

producer.send(producerRecord, new Callback() {

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e==null){

//发送数据返回两个东西--一个是返回结果 一个是异常 异常为空时即发送操作正常

if (e==null){

//返回结果中可获取此条消息的相关分区信息

System.out.println(recordMetadata.offset()+recordMetadata.partition()+recordMetadata.topic());

}

}

}

});

log.info("kafka生产者发送消息{}",msg);

}

producer.close();

}

}

kafka分区策略方法说明:

6f1d56fa1ce9bf691345ce64ae5bf3ee.png

3.Kafka消费者Api的使用及说明

大致流程:配置kafka property信息---构建消费者---订阅主题--消费消息

@Slf4j

public class KafkaConsumerTest {

public static void main(String[] args) {

Properties properties = new Properties();

//第一步 初始化化kafka服务配置Properties--具体配置可以抽到实际的Property配置文件

//设备地址

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");

//反序列化器

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");

//offset自动提交

//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

//重置offset---当团体名发生改变时且消费者保存的初始offset未过期时,消费者会从头消费

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

//初始化消费者

Consumer consumer=new KafkaConsumer(properties);

//初始化消费者订阅主题

consumer.subscribe(Arrays.asList("kafkatest"));

while (true) {

ConsumerRecords records = consumer.poll(1000);

for (ConsumerRecord record : records) {

//消费完按自动提交时间自动提交消费Offset

log.info("kafka消费者消费分区:{}-消息内容:{}",record.partition(),record.value());

}

//异步提交-即消费某条数据时发送offset更新,但消费继续运行 不等待提交完成 效率较高 但当消费者异常挂掉时容

//易造成消费重复

consumer.commitAsync(new OffsetCommitCallback() {

@Override

public void onComplete(Map map, Exception e) {

//如果失败E不为null 失败的话E为Null

//对于需要绝对保证消息不丢失的 可在此处重新进行消费提交

}

});

//同步提交-即消费一条数据提交一次offset更新,消费必须等待offset更新完才可继续运行。通常来讲此方法可尽可能

//的减少数据丢失 但效率较低

//consumer.commitSync();

}

}

}

4.Kafka消费者自动提交Offset和手动提交Offset

自动提交:即消费者消费后自己提交消费offset标记去kafka更新信息,那么通常是通过时间来控制的,比如每10秒更新一次本地的offset到kafka,  缺点:实际应用场景中难以控制时间,太短容易造成数据丢失(offset已经更新 消费者还没消费完就挂了),太长容易导致数据重复(offset还未更新,消费者挂了重新从kafka拉取之前的offset).

手动提交:消费完成后自行提交offset,根据同步情况分为两种方式,syn提交(提交时相当于阻塞主线程,等offset提交完成后方可继续进行)和asyn提交(异步提交),大致流程:配置kafka property配置文件,将配置文件中的自动提交关闭。--构建消费者订阅主题并消费--消费完成后手动提交offset.   缺点:同样还是会有上面自动提交的数据重复问题。但减少了数据丢失的可能性。

5.自定义生产者的拦截器,分区器

@Slf4j

public class KafkaFilter implements ProducerInterceptor {

public static int i=0;

@Override

public ProducerRecord onSend(ProducerRecord producerRecord) {

/**

* 发送消息的方法 可对消息进行处理 比如加时间戳啥的

*/

log.info("{}:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.value());

return producerRecord;

}

@Override

public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

/**

* ack标记回调方法,有点类型Callback回调的方法

* 可在这统计一下成功发送的条数和失败发送的条数

*/

}

@Override

public void close() {

}

@Override

public void configure(Map map) {

}

}

public class KafkaPartion implements Partitioner {

@Override

public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

/**

* 自定义分区 可通过该接口的默认分区器进行参考 默认为根据订阅的主题来分区方式

*/

return 0;

}

@Override

public void close() {

}

@Override

public void configure(Map map) {

}

}

6.消费者如何消费历史数据

大致流程:配置kafka property信息,开启AutoOffset配置---构建消费者---订阅主题--消费消息

那么每次开启消费者如果想从头开始消费,需要满足以下条件之一:1.消费者的组名改变 2.消费者的初始offset未过期

相关参考文章:

kafka消费者监听方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java如何模拟请求_单元测试如何模拟用户请求

python web自动化测试设计构工具书40.9元包邮(需用券)去购买 >错误正当我高高兴兴写完后台c层的测试代码准备提交时&#xff0c;测试机器人报了很多401错误&#xff0c;把代码拉下来一看&#xff0c;原来当我写代码时&#xff0c;我的伙伴已经写好后台的拦截器了&#xff0c…

LeetCode 83. 删除排序链表中的重复元素

原题链接 解法&#xff1a;通过一个指针从头到尾进行扫描 class Solution { public:ListNode* deleteDuplicates(ListNode* head) {if(!head)return nullptr;auto p1 head;while(p1->next){if(p1->next->val p1->val)p1->nextp1->next->next;else p1 …

后端 java ee_刷新器-Java EE 7后端十大功能

后端 java ee这是我的小型Java EE 7复习系列的第二部分。 在进行了简要概述的第一篇介绍之后&#xff0c;我决定请Arjan Tijms撰写有关Java EE 7中他最喜欢的后端新功能的信息。如果您关注Java EE领域&#xff0c;您将会知道Arjan。 他是Java EE开发人员&#xff0c;JSF和Secur…

java cucumber_为Java + STANDARD值引入Cucumber

java cucumber作为软件开发人员&#xff0c;我们都有最喜欢的工具来使我们成功。 许多人在开始工作时就很适合这份工作&#xff0c;但很快就不见了。 其他人则需要太多的设置和培训才能“将脚趾浸入水中”&#xff0c;只是为了简单地确定它们是否是正确的工具即可。 Cucumber …

文章id 文章标题点击量php,WordPress如何通过文章ID获取文章标题等信息

如果我们想要在某一个主题的php文件中调用文章的标题&#xff0c;内容等信息&#xff0c;而在WordPress中唯一一直不会改变的就是文章发布时生成的ID&#xff0c;我们只需要获取文章的ID&#xff0c;即可通过文章ID来获取我们想要的文章信息。调用方法php$id // 文章的 id$tit…

javaone_JavaOne 2015:高级模块化开发

javaoneJavaOne 2015看到了Project Jigsaw团队关于Java 9中的模块化的一系列讨论 。它们都是非常有趣的&#xff0c;并且充满了宝贵的信息&#xff0c;我敦促每个Java开发人员都注意它们。 除此之外&#xff0c;我想给社区一种搜索和引用它们的方法&#xff0c;因此我在这里总…

spark rest_Spark简介,您的下一个REST Java框架

spark rest我希望您今年Java来了&#xff01; 今天&#xff0c;我们将研究一个清新&#xff0c;简单&#xff0c;美观且实用的框架&#xff0c;以Java编写REST应用程序。 它将非常简单&#xff0c;甚至根本不会看起来像Java。 我们将研究Spark Web框架。 不&#xff0c;它与Ap…

oracle 授权 增删改查权限_Oracle增删改查与函数

SQL -- 结构化查询语言 关系型数据库分类&#xff1a; DDL DML DCL DQL TCL Oracle 的数据类型&#xff1a;字符 char() varchar2()数字 number(p,s)时间 date timestamp 文件 clob blob 二维表 table 创建表 CREATE create table 表名 ( 列名 数据类型 [约束], 列名 类型 ... …

_用WSL,MobaXterm,Cmder配置linux开发环境

离不开Windows的理由很多,作为后端开发需要使用linux的情况也很多,双系统总归是不方便,而且linux下的GUI体验也没用Win 10好. 如果使用虚拟机,那么文件交换和网络等各种问题也需要解决,对系统的内存要求也更高一些.微软为了让更多的开发人员留在Win10上面,开发了WSL功能.目前的…

php中上传图片怎么显示出来,PHP上传图片类显示缩略图功能

有缩略图功能 但是 感觉不全面&#xff0c;而且有点问题&#xff0c;继续学习&#xff0c;将来以后修改下/*** Created by PhpStorm.* User: Administrator* Date: 2016/6/28* Time: 21:04*/class upload{protected $fileMine;//文件上传类型protected $filepath;//文件上传路径…

javaparser_JavaParser入门:以编程方式分析Java代码

javaparser我最喜欢的事情之一是解析代码并对其执行自动操作。 因此&#xff0c;我开始为JavaParser做出贡献&#xff0c;并创建了两个相关项目&#xff1a; java-symbol-solver和Effectivejava 。 作为JavaParser的贡献者&#xff0c;我反复阅读了一些有关从Java源代码提取信…

wps xml转换表格_这功能WPS卖近百元?教你免费将PDF转成Word

[PConline 应用]PDF文件如何转换成为Word&#xff1f;很多朋友研究这个问题已经很久了&#xff0c;PDF更利于统一格式传播&#xff0c;Word更便于编辑&#xff0c;因此收到PDF文件后、想要修改时要如何将PDF转换成Word可谓是一个刚需。当然&#xff0c;不少办公软件提供了这样的…

睡眠 应该用 a加权 c加权_在神经网络中提取知识:学习用较小的模型学得更好...

在传统的机器学习中&#xff0c;为了获得最先进的(SOTA)性能&#xff0c;我们经常训练一系列整合模型来克服单个模型的弱点。 但是&#xff0c;要获得SOTA性能&#xff0c;通常需要使用具有数百万个参数的大型模型进行大量计算。 SOTA模型(例如VGG16 / 19&#xff0c;ResNet50)…

gpu编程如何一步步学习_如何学习贴片机编程

学习贴片机编程首选要对贴片机有所熟悉了解&#xff0c;另外对常用的电脑编辑软件要会使用。目前通常学习贴片机编程有专门的培训学校&#xff0c;或者跟着生产线上现有的贴片机编程师傅学习熟练后再进行编程操作。下面深圳智驰科技就来分享一下如何学习贴片机编程。对贴片机编…

plotcylinder matlab,Matlab在任意两点之间绘制三维圆柱

Matlab在任意两点之间绘制三维圆柱Matlab在任意两点之间绘制三维圆柱此函数可能存在一些不足&#xff0c;请多多指教&#xff01;function plotcylinder(u1,u2,color_a,r)Lnorm(u1-u2);RODu2-u1;[X,Y,Z]cylinder(r,100);x1X*0;y1Y*0;z1Z*0;ZL*Z-L/2;ROD_midpoint(u1u2)/2;xROD_…

jdk8和hotspot_HotSpot的-XshowSettings标志的简单性和价值

jdk8和hotspot一个方便的HotSpot JVM标志 &#xff08; 选项为Java启动 java &#xff09;是-XshowSettings选项。 Oracle Java启动器描述页面中对此选项进行了如下描述 &#xff1a; -XshowSettings &#xff1a; category显示设置并继续。 该选项的可能类别参数包括&#xf…

matlab信号分割与比对,matlab测量计算信号的相似度

本示例说明如何测量信号相似度。将回答以下问题&#xff1a;如何比较具有不同长度或不同采样率的信号&#xff1f;如何确定测量中是否存在信号或仅有噪声&#xff1f;有两个信号相关吗&#xff1f;如何测量两个信号之间的延迟&#xff1f;比较具有不同采样率的信号考虑一个音频…

Spring Bootstrap中具有配置元数据的高级配置

在简要介绍了配置元数据并涵盖了我之前的文章《 在Spring Boot中使用配置元数据Pimp您的配置》中的基础知识之后&#xff0c;现在该看看如何进一步执行此步骤并进一步自定义配置。 在这篇文章中&#xff0c;我计划提出对配置属性的弃用&#xff0c;并讨论各种值提供程序&#x…

ssh 与 telnet 有何不同?_采用创新面料Nike Infinalon的全新瑜珈系列究竟有何不同?...

采用创新面料Nike Infinalon的全新瑜珈系列究竟有何不同&#xff1f;无拘无束自由运动——这是耐克瑜伽系列新品的核心设计理念。全新系列为你提供垫上瑜伽时毫无束缚的舒适感&#xff0c;采用了耐克创新型面料&#xff1a;Nike Infinalon。Nike Infinalon应用于耐克最新瑜伽系…

matlab中的导函数驻点,Matlab用导数作定性分析

Matlab用导数作定性分析5.1知识要点&#xff1a;函数作图 —用导数定性描述函数【 clf,xlinspace(-8,8,30);f(x-3).^2./(4*(x-1)); plot(x,f) 】【 fplot((x-3)^2/(4*(x-1)),[-8,8])) 】【 clf,xsym(x); f(x-3)^2/(4*(x-1)); ezplot(f,[-8,8]) ,ti…