【大数据之Kafka】三、Kafka生产者之消息发送流程及同步异步发送API

  将外部传送给过来的数据发送到kafka集群。

1 发送原理

(1)创建main()线程,创建producer对象,调用send方法,经过拦截器(可选)、序列化器、分区器。

(2)分区器将数据发送到分区中,每个分区创建一个队列(分区是在内存中完成的),内存总大小为32M,每个批次的大小为16K。

(3)sender线程将缓冲队列中的数据读取出来发往Kafka集群,根据batch.size和linger.ms拉取数据(即每批次的数据满了之后或者设置的时间到了之后拉取数据)。

(4)sender线程拉取数据,以每个节点为一组,当第一个请求数据发送到broker1中,broker没有及时应答,还是能发送第二个请求,最多有5个请求都没有收到应答就不会再继续发送请求。

(5)selector打通输入流和输出流。

(6)链路接通后发送数据。

(7)Kafka集群收到数据后根据副本机制进行副本同步。

(8)Kafka集群收到数据后根据应答机制进行应答。

(9)selector根据Kafka集群反馈的消息进行判断。

(10)如果成功则删掉该请求同时在缓冲队列里清理掉对应的每一个分区的数据;如果失败则进行重试,重新发送请求,知道成功为止。

在这里插入图片描述

2 生产者重要参数列表

在这里插入图片描述

3 异步发送API

3.1 普通异步发送

(1)需求:创建Kafka 生产者,采用异步的方式发送到 Kafka Broker
(2)分析:异步发送即将外部的数据发送到缓冲队列里(不管缓冲队列中的数据有没有发送到Kafka集群)。

步骤:
(1)创建kafka工程,在pom.xml中导入依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(2)创建类:com.astudy.kafka.producer.CustomProducer

package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {//0.创建 kafka 生产者的配置对象Properties properties = new Properties();//给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.调用 send 方法,发送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i));}//3.关闭资源kafkaProducer.close();}
}

(3)测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述

3.2 带回调函数的异步发送

分析:
  回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

package com.study.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//0.创建 kafka 生产者的配置对象Properties properties = new Properties();//给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.调用 send 方法,发送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first", "test" + i), new Callback() {// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {// 没有异常,输出信息到控制台System.out.println("topic:" + recordMetadata.topic() + "  partition:" + recordMetadata.partition());}else {// 出现异常打印e.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}//3.关闭资源kafkaProducer.close();}
}

测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述
在这里插入图片描述

4 同步发送API

分析:只需在异步发送的基础上,再调用一下 get()方法即可。

package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {//0.创建 kafka 生产者的配置对象Properties properties = new Properties();//给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.调用 send 方法,发送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i)).get();}//3.关闭资源kafkaProducer.close();}
}

测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Android Framework (十二) 】- 智能硬件设备开发

文章目录 前言智能硬件的定义与应用智能硬件产品开发流程智能硬件开发所涉及的技术体系概述关于主板选型主板CPU芯片的选择关于串口通信 总结 前言 针对我过往工作经历&#xff0c;曾在一家智能科技任职Android开发工程师&#xff0c;简单介绍下关于任职期间接触和开发过的一些…

DDPM: Denoising Diffusion Probabilistic Models

DDPM: Denoising Diffusion Probabilistic Models 去噪扩散模型前向过程-加噪声反向过程-去噪声 去噪扩散模型 论文题目&#xff1a;Denoising Diffusion Probabilistic Models (DDPM) 论文来源&#xff1a;NIPS, 2020 论文地址&#xff1a;https://arxiv.org/abs/2006.11239 论…

13.3 目标检测和边界框

锚框的计算公式 假设原图的高为H,宽为W 详细公式推导 以同一个像素点为锚框&#xff0c;可以生成 (n个缩放 m个宽高比 -1 )个锚框 锚框的作用&#xff1a; 不用直接去预测真实框的四个坐标&#xff0c;而是&#xff1a; 1.先生成多个锚框。 2.预测每个锚框里是否含有要预测…

【Linux】【驱动】杂项设备驱动

【Linux】【驱动】杂项设备驱动 Linux三大设备驱动1. 我们这节课要讲的杂项设备驱动是属于我们这三大设备驱动里面的哪个呢?2.杂项设备除了比字符设备代码简单&#xff0c;还有别的区别吗?3.主设备号和次设备号是什么? 挂载驱动 杂项设备驱动是字符设备驱动的一种&#xff0…

小程序制作教程:从零开始搭建企业小程序

在如今的数字化时代&#xff0c;企业介绍小程序成为了企业展示与推广的重要工具。通过企业介绍小程序&#xff0c;企业可以向用户展示自己的品牌形象、产品服务以及企业文化等内容&#xff0c;进而提高用户对企业的认知度和信任度。本文将介绍如何从零开始搭建一个企业介绍小程…

域名配置HTTPS

一、注册域名 这个可以在各大平台注册&#xff0c;具体看一下就会注册了&#xff0c;自己挑选一个自己喜欢的域名。 步骤一般也就是先实名&#xff0c;实名成功了才能注册域名。 二、办理SSL证书 这里使用的是阿里云的SSL免费证书 1、申请证书 二、填写申请 三、域名绑定生…

公司电脑三维图纸加密、机械图挡加密软件

机械图纸加密软件的问世&#xff0c;让很多的网络公司都大受其带来的工作中的便利。在安装了机械图纸加密软件后&#xff0c;不仅可以很好的管理员工在工作时的上网娱乐&#xff0c;在对整个公司员工的工作效率上也有着明显的提高&#xff0c;那么对于机械图纸加密软件的具体特…

【C#】静默安装、SQL SERVER静默安装等

可以通过cmd命令行来执行&#xff0c;也可以通过代码来执行&#xff0c;一般都需要管理员权限运行 代码 /// <summary>/// 静默安装/// </summary>/// <param name"fileName">安装文件路径</param>/// <param name"arguments"…

word 应用 打不开 显示一直是正在启动中

word打开来显示一直正在启动中&#xff0c;其他调用word的应用也打不开&#xff0c;网上查了下以后进程关闭spoolsv.exe,就可以正常打开word了

机器学习:特征工程之特征预处理

目录 特征预处理 1、简述 2、内容 3、归一化 3.1、鲁棒性 3.2、存在的问题 4、标准化 ⭐所属专栏&#xff1a;人工智能 文中提到的代码如有需要可以私信我发给你&#x1f60a; 特征预处理 1、简述 什么是特征预处理&#xff1a;scikit-learn的解释&#xff1a; provide…

linux系统服务学习(六)FTP服务学习

文章目录 FTP、NFS、SAMBA系统服务一、FTP服务概述1、FTP服务介绍2、FTP服务的客户端工具3、FTP的两种运行模式&#xff08;了解&#xff09;☆ 主动模式☆ 被动模式 4、搭建FTP服务&#xff08;重要&#xff09;5、FTP的配置文件详解&#xff08;重要&#xff09; 二、FTP任务…

Python基础语法入门(第二十天)——文件操作

一、基础内容 在Python中&#xff0c;路径可以以不同的表现形式进行表示。以下是一些常用的路径表现形式&#xff1a; 1. 绝对路径&#xff1a;它是完整的路径&#xff0c;从根目录开始直到要操作的文件或文件夹。在Windows系统中&#xff0c;绝对路径以盘符开始&#xff0c;…

【学会动态规划】环形子数组的最大和(20)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

高层建筑全景vr火灾隐患排查模拟培训软件助力群众防范火灾伤害

随着城市化进程的加快&#xff0c;楼宇建筑的数量也在不断增加。然而&#xff0c;楼宇消防安全问题也日益突出。为了提高楼宇员工和居民的消防安全意识&#xff0c;楼宇VR消防安全教育培训应运而生。VR安全培训公司深圳华锐视点制作的楼宇vr消防安全教育培训&#xff0c;包括消…

谷粒商城第十一天-完善商品分组(主要添上关联属性)

目录 一、总述 二、前端部分 2.1 改良前端获取分组列表接口及其调用 2.2 添加关联的一整套逻辑 三、后端部分 四、总结 一、总述 前端部分和之前的商品品牌添加分类差不多。 也是修改一下前端的分页获取列表的接口&#xff0c;还有就是加上关联的那一套逻辑&#xff0c;…

nginx负载均衡与反向代理与正向代理

负载均衡&#xff1a;通过反向代理来实现 正向代理的配置方法。 正向代理&#xff1a; 工作原理&#xff1a;用户端直接访问不了&#xff0c;需要通过代理服务器来访问web服务器&#xff0c;用户端先访问代理服务器&#xff0c;再访问web服务器。web服务器响应给代理服务器&a…

【C语言】调试技巧

目录 一、什么是bug? 二、调试 1.一般调试的步骤 2.Debug 和 Release 三、调试环境准备 四、调试时要查看的信息 1.查看临时变量的值 2.查看内存信息 3.查看调用堆栈 4.查看反汇编信息 5.查看寄存器 五、练习 六、常见的coding技巧 七、const的作用 八、编程常见…

如何应用项目管理软件进行敏捷开发管理

敏捷开发&#xff08;Agile Development&#xff09;是一种软件开发方法论&#xff0c;强调在不断变化的需求和环境下&#xff0c;通过迭代、协作和自适应的方式来开发软件。敏捷方法的目标是提供更快、更灵活、更高质量的软件交付&#xff0c;以满足客户需求并实现项目成功。 …

服务器数据恢复-EqualLogic存储RAID5数据恢复案例

服务器数据恢复环境&#xff1a; 一台DELL EqualLogic存储中有一组由16块SAS硬盘组建的RAID5阵列。存储存放虚拟机文件&#xff0c;采用VMFS文件系统&#xff0c;划分了4个lun。 服务器故障&检测&分析&#xff1a; 存储设备上有两个硬盘指示灯显示黄色&#xff0c;存储…

1022.从根到叶的二进制之和

目录 一、题目 二、代码 一、题目 二、代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nu…