【大数据学习 | kafka】producer的参数与结构

1. producer的结构

producer:生产者

它由三个部分组成

interceptor:拦截器能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的

serialiazer:序列化器kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定

partitioner: 分区器主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

record accumulator

本地缓冲累加器 默认32M

producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.msbatch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率

batch-size 默认16KB

linger.ms 默认为0

生产者部分的整体流程

首先producer将发送的数据准备好

经过interceptor的拦截器进行处理,如果有的话

然后经过序列化器进行转换为相应的byte[]

经过partitioner分区器分类在本地的record accumulator中缓冲

sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

2. producer的简单代码

2.1 准备:

引入maven依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>

在resources文件中创建log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

2.2 生产者中的设定参数

参数含义
bootstrap.serverskafka集群的地址
key.serializerkey的序列化器,这个序列化器必须和key的类型匹配
value.serializervalue的序列化器,这个序列化器必须和value的类型匹配
batch.size批次拉取大小默认是16KB
linger.ms拉取的间隔时间默认为0,没有延迟
partitioner分区器存在默认值
interceptor拦截器选的

2.3 全部代码

public class producer_test {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB,参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");//发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值producer.send(record);producer.close();}
}

在x-shell中观察消费的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58921.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文速读】Optimization-based Prompt Injection Attack to LLM-as-a-Judge

基于优化的提示词注入攻击 摘要引言问题描述LLM-as-a-judge威胁模型攻击者知道什么 JUDGEDECEIVER 细节概述生成影子候选回复公式化为优化问题Target-aligned generation lossTarget-enhancement lossAdversarial perplexity loss优化问题 求解优化问题 摘要 LLM-as-a-Judge 利…

人工智能证书合集

本文将对目前市面上主流官方机构颁发的人工智能证书进行整理和介绍&#xff0c;由于整理的证书较多&#xff0c;本文共一万八千多字&#xff0c;请根据自己的考证需求阅读对应部分的内容&#xff0c;希望本文对人工智能行业的从业人员和计划从事人工智能相关岗位工作的人员有所…

Java入门8——二维数组

今天的内容算是数组的收尾~~ 从下次开始就要开始学习类和对象了&#xff0c;冲冲冲&#xff01; 首先二维数组&#xff0c;也很好理解&#xff0c;就是把几个一维数组拼在一起了&#xff0c;我们用代码来熟悉一下~ public class javaSchool {public static void main(String[…

自动售饮料机控制电路的设计

自动售饮料机控制电路的设计 1 设计目的 &#xff08;1&#xff09;熟悉数字电路的应用。 &#xff08;2&#xff09;掌握常常利用逻辑运算器及D触发器的逻辑功能及利用方式。 &#xff08;3&#xff09;熟悉电路仿真软件Multisim 利用。 &#xff08;4&#xff09;了解自动售饮…

高速高精运动控制解决方案亮相2024 NEPCON亚洲电子展!

■展会名称&#xff1a; NEPCON ASIA 2024 亚洲电子生产设备暨微电子工业展览会&#xff08;以下简称“亚洲电子展”&#xff09; ■展会日期 2024年11月6 -8日 ■展馆地点 中国深圳国际会展中心(宝安) ■展位号 11号馆-11A24 11月6日至8日&#xff0c;亚洲电子展将在中…

Flask轻松上手:从零开始搭建属于你的Web应用

目录 一、准备工作 二、安装Flask 三、创建你的第一个Flask应用 创建一个新的Python文件 编写Flask应用代码 运行Flask应用 四、创建一个简单的博客系统 定义路由和文章列表 创建模板文件 运行并测试博客系统 五、使用数据库存储用户信息 安装Flask-SQLAlchemy 修…

STM32开发 —— 新工程创建思路终于清晰了

目 录 工程创建三步法一、工程文件夹创建二、管理工程项三、配置工程参数 工程创建三步法 从ST官网下载好stm32标准库或HAL库&#xff0c;HAL库目录如下。 在Keil开发环境中创建STM32工程&#xff0c;分三大步即可完成工程的创建&#xff1a; 一步&#xff1a;在本地磁盘创建…

Java SpringBoot调用大模型AI构建AI应用

本文是一个用springboot 结合spring mvc 和spring ai alibaba 调用国产大模型通义千问的具体例子&#xff0c;按照这个做能够快速的搞定Java应用的调用。 然后就可以把这类应用泛化到所有的涉及到非结构化数据结构化的场景中。 Spring AI&#xff1a;简化Java中大模型调用的框…

【办公类-04-04】华为助手导出照片视频分类(根据图片、视频的文件名日期导入“年-月-日”文件夹中,并转移到“年-月”文件中整理、转移到“年”文件夹中整理)

背景需求 最近带班&#xff0c;没有时间整理照片&#xff0c;偶尔导一次&#xff0c;几个月的照片。发现用电脑版“华为手机助手“中的WLAN连接”与华为手机的“华为手机助手”连接&#xff0c;速度更快、更稳定&#xff0c;不会出现数据线连接时碰碰就断网的问题 1、先打开电…

电脑没有下载声卡驱动怎么办?电脑声卡驱动安装方法

在日常使用电脑的过程中&#xff0c;我们可能会遇到电脑没有声音的问题&#xff0c;这往往与声卡驱动缺失或损坏有关。声卡驱动是连接电脑硬件&#xff08;声卡&#xff09;与操作系统之间的桥梁&#xff0c;确保音频信号能够正常输入输出。那么&#xff0c;当电脑没有声卡驱动…

MYSQL死锁真实案例

​最近例行巡检时候发现一个死锁,阿里云RDS FOR MYSQL 8.0.X! 虽然阿里云的死锁页面看起来比较友好,不过跟社区版一样只是显示事务最后一条死锁SQL和相关的信息.一不小心对初级MYSQL DBA来说,深深地误导,浪费大量时间研究这两个SQL怎么发生了死锁! 阿里云RDS默认情况下审计没有…

【Spring】Spring Boot 日志(8)

本系列共涉及4个框架&#xff1a;Sping,SpringBoot,Spring MVC,Mybatis。 博客涉及框架的重要知识点&#xff0c;根据序号学习即可。 目录 本系列共涉及4个框架&#xff1a;Sping,SpringBoot,Spring MVC,Mybatis。 博客涉及框架的重要知识点&#xff0c;根据序号学习即可。 …

饿了么数据库表设计

有商家表、商品表、商品规格表、购物车表&#xff0c;不难分析出表是不够全面的。 (1)首先分析需要补充的表 1.对于购物车而言肯定有对应的用户&#xff0c;因此要添加一个用户表。 2.商品规格是冷&#xff0c;热&#xff0c;半分糖、全糖&#xff0c;对于冷热和半分糖是可以分…

C++模拟实现list

C教学总目录 C模拟实现list 1、成员变量2、迭代器3、insert函数4、erase函数5、pop_back、push_front、pop_front函数6、size和clear函数7、析构函数8、拷贝构造函数9、赋值运算符重载完整代码&#xff08;包含测试代码&#xff09; 1、成员变量 先来看看SGI版本STL中list的实…

【STM32】SD卡

(一)常用卡的认识 在学习这个内容之前&#xff0c;作为生活小白的我对于SD卡、TF卡、SIM卡毫无了解&#xff0c;晕头转向。 SD卡&#xff1a;Secure Digital Card的英文缩写&#xff0c;直译就是“安全数字卡”。一般用于大一些的电子设备比如&#xff1a;电脑、数码相机、AV…

品牌怎么找到用户发的优质内容,进行加热、复制?

在&#xff0c;相对传统媒体来说&#xff0c;社交媒体营销具有更高的成本效益。品牌可以通过相对较低的成本达到大量潜在客户&#xff0c;尤其是通过口碑营销和内容分享&#xff0c;可以实现倍增的传播效果。在社媒营销的过程中&#xff0c;去找到与品牌有关的优质、正向内容&a…

物联网设备如何助力实现高效远程老人监护

在发达国家&#xff0c;老龄化进程加速&#xff0c;老年人常需医疗、行动辅助、安全保障及个人卫生护理&#xff0c;费用高昂。传统老人监护依赖护士或助理现场照料&#xff0c;而物联网远程监控方案能有效改进此模式。它通过运用传感器等技术&#xff0c;实现全天候低成本实时…

如何使用和打开jconsole

配置: spring.jmx.enabledtrue spring.jmx.default-domainmybatiesdemo management.endpoints.jmx.exposure.include* 启动参数: -Dcom.sun.management.jmxremote.port9000 -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse 启动项…

残差块(Residual Block)

1. **残差块的定义与作用**&#xff1a; 残差块通过引入跳跃连接&#xff08;skip-connection&#xff09;或称为快捷连接&#xff08;shortcut connection&#xff09;&#xff0c;允许网络学习输入与输出之间的残差映射&#xff0c;即学习函数&#xff0c;其中 是期望的底层映…

Sigrity Power SI VR noise Metrics check模式如何进行电源噪声耦合分析操作指导

SSigrity Power SI VR noise Metrics check模式如何进行电源噪声耦合分析操作指导 Sigrity Power SI的VR noise Metrics check模式本质上是用来评估和观测器件的电源网络的耦合对于信号的影响,输出S参数以及列出具体的贡献值。 以下图为例