Kafka核心参数(带完善)

客户端

api

Kafka提供了以下两套客户端API

  • HighLevel(重点)
  • LowLevel 

HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用

生产者发送消息

发送流程:

  1. 组装生产者核心配置参数
  2. 初始化生产者
  3. 组装消息
  4. 发送消息, 三种模式
    1. 单向发送, 不等待broker返回结果
    2. 同步发送
    3. 异步发送
  5. 关闭生产者

代码:

package com.kk.kafka.demo;import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class ProducerTest {public static final String KAFKA_URL = "192.168.6.128:9092";public static final String TOPIC = "oneTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 组装生产者配置Properties ps = new Properties();ps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);ps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");ps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 初始化生产者Producer<String, String> producer = new KafkaProducer<>(ps);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "key" + i, "message" + i);// 同步发送producer.send(producerRecord);// 同步发送RecordMetadata metadata = producer.send(producerRecord).get();//异步发送producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (metadata != null) {System.out.println("Message sent successfully! Topic: " + metadata.topic() +", Partition: " + metadata.partition() +", Offset: " + metadata.offset() +", message: " + producerRecord.value());} else {System.err.println("Error sending message: " + e.getMessage());}}});}producer.close();}
}

消费者消费消息

消费流程:

  1. 组装消费者核心配置参数
  2. 初始化消费者
  3. 订阅topic, 可订阅多个
  4. 拉取消息, 可配置超时时间
  5. 提交offset, 分为同步和异步两种方式, 服务端维护offset消费进度

代码:

package com.kk.kafka.demo;import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerTest {public static final String KAFKA_URL = "192.168.6.128:9092";public static final String TOPIC = "oneTopic";public static void main(String[] args) {// 组装消费者配置参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 初始化消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅topicconsumer.subscribe(Arrays.asList(TOPIC));while (true) {// 拉取消息,  100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("start Consumer offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。//同步提交,表示必须等到offset提交完毕,再去消费下一批数据。consumer.commitSync();//异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
//            consumer.commitAsync();}}
}

客户端整体流程

拦截器

序列化器

发送到Dequeue

Dequeue满了或者批次满了或者阈值时间 推到InflightRequest

send线程将InflightRequest推到服务端Partition, 满足一定阈值

缓存机制

broker给生产者ack

消费者分组策略

相关消费者配置

  • GROUP_ID_CONFIG: 群组唯一id
  • GROUP_INSTANCE_ID_CONFIG: 消费实例id, 可以减少不必要rebalance

生产者给Topic投递消息, 消息会均匀的存到partition. 消息会向所有订阅该Topic的消费实例推送, 推送时, 一个消费群组只会推送一份. 也就是, 同一个群组里面, 只会有一个消费实例能消费; 不同消费群组可以重复消费消息.Offset就是记录每个消费群组在partition的处理消息进度

offset丢失怎么解决

1. offset丢失:

  1. 初始化consumer group时, 设置offset失败
  2. offset对应的数据文件被删除

解决: 服务端有个兜底方案, 可以配置消费者配置

ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG :当Server端没有对应的Offset时,要如何处理。

可选项:

  • earliest: 自动设置为当前最早的offset
  • latest:自动设置为当前最晚的offset
  • none: 如果消费者组对应的offset找不到,就向Consumer抛异常。
  • 其他选项: 向Consumer抛异常。

2. offset不一致问题

消费者可以选择同步提交或者异步提交

  • 同步提交: 消息处理完, 提交. 消息处理失败, 选择不提交, 等重试. 如果消费过慢, 服务端不会无限等, 会认为本次消费失败, 会给同组的其他消费实例投递消费, 可能存在重复消费问题 
  • 异步提交: 先提交, 后处理消息. 如果消息处理失败, offset又被提交, 就存在客户端与服务端offset不一致问题

解决: 可以交由客户端管理offset, 存到redis或者mysql等中间件.客户端可以控制消息处理进度, 实时推进offset

生产者拦截器

生产者配置: INTERCEPTOR_CLASSES_CONFIG

        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.kk.kafka.demo.MyProducerInterceptor");

public class MyProducerInterceptor implements ProducerInterceptor {// 发送消息触发@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println("onSend producerRecord:" + producerRecord.toString());return producerRecord;}// 收到服务端相应触发@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {System.out.println("acknowledgement recordMetadata:" + recordMetadata.toString());}// 生产者连接关闭触发@Overridepublic void close() {System.out.println("producer close");}// 整理配置项@Overridepublic void configure(Map<String, ?> map) {}
}

消息序列化

相关配置:

  • KEY_SERIALIZER_CLASS_CONFIG: key序列号
  • VALUE_SERIALIZER_CLASS_CONFIG: 消息value序列号
  • KEY_DESERIALIZER_CLASS_CONFIG: key反序列化
  • VALUE_DESERIALIZER_CLASS_CONFIG: 消息value反序列化

1. 生产者:

ps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
ps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

2. 消费者

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

消息分区路由机制

生产者缓存机制

其他重要核心参数 

幂等性原理

事务消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/240516.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux-Keepalived(VRRP协议)高可用集群搭建

Linux-Keepalived&#xff08;VRRP协议&#xff09;高可用集群搭建 一、VRRP简介1.1 什么是VRRP&#xff1f;1.2 keepalived是什么&#xff1f;1.3 keepalived工作原理 二、实操配置过程2.1 试验模型2.2. Keepalived监控和维护VRRP集群的步骤2.2.1 安装keepalived2.2.2 配置kee…

sql_lab之sqli中的head头注入,less18

报错注入中的head注入&#xff08;less-18&#xff09; 1.输入用户名和密码123 123显示登录错误 2.输入用户名和密码123’ 123显示登录错误 则证明不是普通报错注入&#xff0c;因为有用户名和密码框&#xff0c;如果不是普通报错注入则尝试head注入 3.用burp进行爆破&#x…

【已解决】在使用frp内网穿透访问VUE项目提示:Invalid Host/Origin header 解决方案

项目配置 在使用frp作为内网穿透的时候&#xff0c;配置了多端口穿透(也是第一次配置frp多端口)&#xff0c;端口配置如下&#xff1a; 8079&#xff1a;vue项目的管理系统 8080&#xff1a;vue项目的前台系统 8082&#xff1a;普通的web项目 更高frp相关问题 &#x1f4…

4.3 C++对象模型和this指针

4.3 C对象模型和this指针 4.3.1 成员变量和成员函数分开存储 在C中&#xff0c;类内的成员变量和成员函数分开存储 只有非静态成员变量才属于类的对象上 #include <iostream>class Person { public:Person() {mA 0;} //非静态成员变量占对象空间int mA;//静态成员变量…

数据预处理:多重共线性_检测和解决办法

文章目录 1.多重共线性简介&#xff08;Collinearity and Multicollinearity&#xff09;1.1 多重共线性的后果1.2 处理多重共线性问题的方法 2. 设置2.1 导入库2.2 数据集特征波士顿房价BMI 数据集 2.3 导入数据 3. 相关矩阵3.1 聚类图 4. 方差膨胀因子4.1 两种多重共线性4.2 …

外卖托管运营专家邦火策划怎么样,为您的餐厅带来了什么不同?

在当今激烈竞争的餐饮市场&#xff0c;外卖托管运营正逐渐成为许多餐厅提升业绩的有效手段。邦火策划以其专业的服务和独特的策略&#xff0c;成为外卖托管运营领域的专家。让我们一同探究&#xff0c;选择邦火策划为您的餐厅带来了怎样的不同。 在邦火策划的引领下&#xff0…

【UML】第10篇 类图(属性、操作和接口)(2/3)

目录 3.3 类的属性&#xff08;Attribute&#xff09; 3.3.1 可见性&#xff08;Visibility&#xff09; 3.3.2 属性的名称 3.3.3 数据类型 3.3.4 初始值 3.3.5 属性字符串 3.4 类的操作&#xff08;Operations&#xff09; 3.4.1 参数表 3.4.2 返回类型 3.5 类的职责…

基于JavaWeb的个人健康信息管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本个人健康信息管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据…

data数据响应式

data数据响应式 所有在实例上挂载的属性&#xff0c;都可以在视图中直接使用 data中的数据&#xff0c;是经过“数据劫持”的&#xff0c;是“响应式数据” 响应式&#xff1a;修改数据&#xff0c;视图会自动更新 MV原理&#xff1a;其中一条线的原理&#xff0c;data响应式的…

Nessus详细安装-windows (保姆级教程)

Nessus描述 Nessus 是一款广泛使用的网络漏洞扫描工具。它由 Tenable Network Security 公司开发&#xff0c;旨在帮助组织评估其计算机系统和网络的安全性。 Nessus 可以执行自动化的漏洞扫描&#xff0c;通过扫描目标系统、识别和评估可能存在的安全漏洞和弱点。它可以检测…

DRF从入门到精通三(反序列化数据校验源码分析、断言Assert、DRF之请求、响应)

文章目录 一、反序列化数据校验源码分析二、断言Assert三、DRF之请求、响应Request类和Response类请求中的Request 能够解析前端传入的编码格式响应中的Response能够响应的编码格式 一、反序列化数据校验源码分析 反序列化数据校验&#xff0c;校验顺序为&#xff1a;先校验字段…

Go后端开发 -- Golang的语言特性

Go后端开发 – Golang的语言特性 文章目录 Go后端开发 -- Golang的语言特性一、Golang的优势1.部署极其简单&#xff1a;2.静态语言3.语言层面的并发4.强大的标准库5.简单易学6.运行效率对比 二、Golang的适用领域1.应用领域2.明星产品 三、Golang的不足 一、Golang的优势 1.部…

共享购:消费前沿的领导者

在当今这个信息化、互联网高速发展的时代&#xff0c;商业模式也在不断地创新和变革。共享购模式作为一种新型的商业模式&#xff0c;正逐渐受到广泛的关注和追捧。本文将深入探讨共享购模式的核心理念、优势以及如何应用在实际商业场景中&#xff0c;为读者揭示这一模式的巨大…

计算机组成原理综合5

A 按照题意&#xff0c;程序P执行的时钟周期数为1000080%11000020%1028000&#xff0c;程序P的平均CPI为28000/100002.8&#xff0c;计算机主频为1GHz&#xff0c;CPU执行时间为28000/(1G/s)28μs。 B A 符号位为1 首先将0.4375转化为二进制&#xff0c;0.250.1250.06250.43…

nodejs+vue+ElementUi资源互助共享平台的设计

后台&#xff1a;管理员功能有个人中心&#xff0c;用户管理&#xff0c;卖家管理&#xff0c;咨询师管理&#xff0c;萌宝信息管理&#xff0c;幼儿知识管理&#xff0c;保姆推荐管理&#xff0c;音频资源管理&#xff0c;二手商品管理&#xff0c;商品分类管理&#xff0c;资…

推荐给前端开发的 5 款 Chrome 扩展

工欲善其事&#xff0c;必先利其器。Chrome 可能是前端开发中使用最多的浏览器。在日常开发中&#xff0c;下列几款 Chrome 扩展也许能让你的开发工作事半功倍 &#x1f680; Vue.js devtools ⚙️ vue 官方专为 vue 应用开发的调试工具。 通过使用它&#xff0c;你可以快速查看…

力扣单调栈算法专题训练

目录 1 专题说明2 训练 1 专题说明 本博客用来计算力扣上的单调栈题目、解题思路和代码。 单调栈题目记录&#xff1a; 2232866美丽塔II 2 训练 题目1&#xff1a;2866美丽塔II。 解题思路&#xff1a;先计算出prefix[i]&#xff0c;表示0~i满足递增情况下&#xff0c;0~i…

NC65 查询单据所处的流程状态以及流程平台客户端工具类

1、查询单据所处的流程状态 nc.bs.wfengine.engine.EngineService的queryFlowStatus()方法 /*** 查询单据所处的流程状态* * param billId* param billType* param result* return* throws DbException*/public int queryFlowStatus(String billId, String billType, int flo…

【UML】第8篇 用例图(3/3)

目录 一、用例的关系 1.1 泛化&#xff08;Generalization&#xff09;关系 1.2 包含&#xff08;include&#xff09;关系 1.3 扩展关系 二、用例表示例 不是非要把电影改成连续剧&#xff0c;给大家播&#xff0c;确实是时间和精力有限。 用例图&#xff0c;虽然简单&…

荣誉 | 第七在线(7thonline)荣获STIF2023年度数智化创新典范奖

12月15日&#xff0c;STIF2023 第四届国际科创节暨 DSC2023 国际数字服务大会&#xff08;数服会&#xff09;在北京隆重举行。 在本届科创节暨数服会活动评选中&#xff0c;经企业申报、评委会审议&#xff0c;第七在线&#xff08;7thonline&#xff09;AI智能零售商品计划库…