Kafka学习笔记(二)

目录

  • 第3章 Kafka架构深入
    • 3.3 Kafka消费者
      • 3.3.1 消费方式
      • 3.3.2 分区分配策略
      • 3.3.3 offset的维护
    • 3.4 Kafka高效读写数据
    • 3.5 Zookeeper在Kafka中的作用
    • 3.6 Kafka事务
      • 3.6.1 Producer事务
      • 3.6.2 Consumer事务(精准一次性消费)
  • 第4章 Kafka API
    • 4.1 Producer API
      • 4.1.1 消息发送流程
      • 4.1.2 异步发送API
      • 4.1.3 同步发送API
    • 4.2 Consumer API
      • 4.2.1 自动提交offset
      • 4.2.2 手动提交offset


第3章 Kafka架构深入

3.3 Kafka消费者

3.3.1 消费方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。

它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

3.3.2 分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有两种分配策略,一是roundrobin,一是range。

  1. roundrobin
    在这里插入图片描述

  2. range
    在这里插入图片描述

3.3.3 offset的维护

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

3.4 Kafka高效读写数据

  1. 顺序写磁盘
    Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到到600M/s,而随机写只有100k/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
  2. 应用Pagecache
    Kafka数据持久化是直接持久化到Pagecache中,这样会产生以下几个好处:
  • I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
  • I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
  • 充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担
  • 读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据
  • 如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用

尽管持久化到Pagecache上可能会造成宕机丢失数据的情况,但这可以被Kafka的Replication机制解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。

  1. 零复制技术
    在这里插入图片描述

3.5 Zookeeper在Kafka中的作用

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

Controller的管理工作都是依赖于Zookeeper的。

以下为partition的leader选举过程:
在这里插入图片描述

3.6 Kafka事务

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

3.6.1 Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

3.6.2 Consumer事务(精准一次性消费)

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

如果想完成Consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质中(比如mysql)。这部分知识会在后续项目部分涉及。

第4章 Kafka API

4.1 Producer API

4.1.1 消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

KafkaProducer 发送消息流程:

在这里插入图片描述

相关参数:

  • batch.size:只有数据积累到batch.size之后,sender才会发送数据。
  • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

4.1.2 异步发送API

  1. 导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version>
</dependency>
  1. 编写代码

需要用到的类:

  • KafkaProducer:需要创建一个生产者对象,用来发送数据
  • ProducerConfig:获取所需的一系列配置参数
  • ProducerRecord:每条数据都要封装成一个ProducerRecord对象

不带回调函数的API:

package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-listprops.put("acks", "all");props.put("retries", 1);//重试次数props.put("batch.size", 16384);//批次大小props.put("linger.ms", 1);//等待时间props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));}producer.close();}
}

带回调函数的API:

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-listprops.put("acks", "all");props.put("retries", 1);//重试次数props.put("batch.size", 16384);//批次大小props.put("linger.ms", 1);//等待时间props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {//回调函数,该方法会在Producer收到ack时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("success->" + metadata.offset());} else {exception.printStackTrace();}}});}producer.close();}
}

4.1.3 同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。

由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-listprops.put("acks", "all");props.put("retries", 1);//重试次数props.put("batch.size", 16384);//批次大小props.put("linger.ms", 1);//等待时间props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();}producer.close();}
}

4.2 Consumer API

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据是必须考虑的问题。

4.2.1 自动提交offset

  1. 导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version>
</dependency>
  1. 编写代码

需要用到的类:

  • KafkaConsumer:需要创建一个消费者对象,用来消费数据
  • ConsumerConfig:获取所需的一系列配置参数
  • ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

自动提交offset的相关参数:

enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔

以下为自动提交offset的代码:

package com.atguigu.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("first"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}

4.2.2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

  1. 同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;/*** @author liubo*/
public class CustomComsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");//Kafka集群props.put("group.id", "test");//消费者组,只要group.id相同,就属于同一个消费者组props.put("enable.auto.commit", "false");//关闭自动提交offsetprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("first"));//消费者订阅主题while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync();//同步提交,当前线程会阻塞知道offset提交成功}}
}
  1. 异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

以下为异步提交offset的示例:

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @author liubo*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");//Kafka集群props.put("group.id", "test");//消费者组,只要group.id相同,就属于同一个消费者组props.put("enable.auto.commit", "false");//关闭自动提交offsetprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("first"));//消费者订阅主题while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for" + offsets);}}});//异步提交}}
}
  1. 数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

数据重复消费问题:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/150669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

异常控制流——(中断、陷阱、故障、终止、进程等操作系统干货)

异常 异常控制流 控制流&#xff1a; 假设从处理机上电运行一直到断电关机的这段时间内&#xff0c;程序计数器的值是下图序列&#xff0c;其中ak表示某一条指令Ik的地址。 **控制转移&#xff1a;**每一次从ak到ak1的过渡 **平滑&#xff1a;**Ik和Ik1在内存中是相邻的&am…

怎样备份电脑文件比较安全

域智盾软件是一款功能强大的电脑监控软件&#xff0c;它不仅具备实时屏幕监控、行为审计等功能&#xff0c;还能够对电脑文件进行备份和管理。下面将介绍域智盾软件如何备份电脑文件&#xff0c;以确保数据安全。 1、开启文档备份功能 部署后台&#xff0c;然后点击文档安全&a…

李宏毅2023机器学习作业HW05解析和代码分享

ML2023Spring - HW5 相关信息&#xff1a; 课程主页 课程视频 Sample code HW05 视频 HW05 PDF 个人完整代码分享: GitHub | Gitee | GitCode 运行日志记录: wandb P.S. HW05/06 是在 Judgeboi 上提交的&#xff0c;完全遵循 hint 就可以达到预期效果。 因为无法在 Judgeboi 上…

Vue3 源码解读系列(八)——生命周期

生命周期 正常的生命周期 // 注册钩子函数 const onBeforeMount createHook(bm/* BEFORE_MOUNT */) const onMounted createHook(m/* MOUNTED */) const onBeforeUpdate createHook(bu/* BEFORE_UPDATE */) const onUpdated createHook(u/* UPDATED */) const onBeforeUnm…

商城免费搭建之java商城 java电子商务Spring Cloud+Spring Boot+mybatis+MQ+VR全景+b2b2c

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

java桌面程序

目标之一是把打印导出的功能最终用java实现一套&#xff0c;首先选定javafx&#xff0c;因为idea默认创建工程就带的javafx&#xff0c;没找到swing。 创建工程&#xff0c;这里要选1.8&#xff0c;高版本jdk默认不带fx 实现主界面的代码 package sample;import javafx.app…

将word中的表格无变形的弄进excel中

在上篇文章中记录了将excel表拷贝到word中来&#xff1a; 记录将excel表无变形的弄进word里面来-CSDN博客 本篇记录&#xff1a;将word中的表格无变形的弄进excel中。 1.按F12&#xff0c;“另存为...”&#xff0c;保存类型&#xff1a;“单个文件页面”&#xff0c;保存。…

numpy报错:AttributeError: module ‘numpy‘ has no attribute ‘float‘

报错&#xff1a;AttributeError: module numpy has no attribute float numpy官网&#xff1a;NumPy 报错原因&#xff1a;从numpy1.24起删除了numpy.bool、numpy.int、numpy.float、numpy.complex、numpy.object、numpy.str、numpy.long、numpy.unicode类型的支持。 解决办法…

Java-方法的重写

【1】重写&#xff1a; 发生在子类和父类中&#xff0c;当子类对父类提供的方法不满意的时候&#xff0c;要对父类的方法进行重写。 【2】重写有严格的格式要求&#xff1a; 子类的方法名字和父类必须一致&#xff0c;参数列表&#xff08;个数&#xff0c;类型&#xff0c…

Django 入门学习总结4

视图是Django应用程序在Python语言中提供特定的方法并对应于有特定的模板的网页。网页的页面通过视图的方式进行跳转。 在投票系统中&#xff0c;有四个视图&#xff1a; 首页视图&#xff0c;显示最新的问题列表。细节视图&#xff0c;显示问题文本&#xff0c;通过表单可以…

阿里国际站(直通车)

1.国际站流量 2.直通车即P4P&#xff08;pay for performance点击付费&#xff09; 2.1直通的含义&#xff1a;按点击付费&#xff0c;通过自助设置多维度展示产品信息&#xff0c;获得大量曝光吸引潜在买家。 注意&#xff1a;中国大陆和尼日利尼地区点击不扣费。 2.2扣费规…

Apache Doris (五十四): Doris Join类型 - Bucket Shuffle Join

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

Python的机器学习库:Sklearn库

随着机器学习的发展&#xff0c;各种强大的机器学习库涌现出来&#xff0c;使开发人员能够更轻松地构建和应用机器学习模型。其中&#xff0c;Scikit-learn&#xff08;简称Sklearn&#xff09;作为Python中最受欢迎和广泛使用的机器学习库之一&#xff0c;提供了丰富的功能和算…

vue+element实现多级表头加树结构

标题两种展示方式 方式一 完整代码: <template><div class"box"><el-tableref"areaPointTable":data"tableData"border:span-method"objectSpanMethod":header-cell-style"tableHeaderMerge"><el-ta…

mac中安装Homebrew

1、Homebrew是什么&#xff1f; 软件安装管理工具 2、先检查电脑中是否已经安装了Homebrew 打开终端输入&#xff1a;brew 提示命令没有找到&#xff0c;说明电脑没有安装Homebrew 如果提示上述图片说明Homebrew已经安装成功 3、安装Homebrew 进入https://brew.sh/ 复制的命…

Android开发APP显示头部Bar

Android开发显示头部Bar 需求&#xff1a; 显示如下图&#xff1a; 显示头部Bar&#xff0c;颜色也能自定义。 解决方案 这个修改是在如下三个文件里进行修改&#xff1a; 按顺序修改&#xff1a; themes.xml(night): <resources xmlns:tools"http://schemas.andr…

持续集成交付CICD:Jenkins Sharedlibrary 共享库

目录 一、理论 1.共享库 2.共享库配置 3.使用共享库 4.共享库扩展 二、实验 1.连接共享库 2.使用共享库 三、问题 1.路径报错 2.readJSON 报错 一、理论 1.共享库 &#xff08;1&#xff09;概念 1&#xff09;共享库这并不是一个全新的概念&#xff0c;其实在编…

央国企数字化转型难在哪?为什么要数字化转型?

科技在发展&#xff0c;技术在升级&#xff0c;全球信息化、数字化的步伐在加快&#xff0c;企业想要在未来的发展中抓住机会&#xff0c;更好地发展壮大&#xff0c;就需要加快企业数字化转型的速度&#xff0c;才能立足于信息化、数字化时代&#xff0c;央国企作为企业中的一…

Vue3 项目修改index.html的 title

实现思路 通过插件 vite-plugins-html 进行参数配置&#xff0c;html 中使用参数&#xff0c;实现配置安装插件 $ npm install vite-plugins-html --save-devvite.config.js 中的配置 // 可以动态处理html文件内容的 import { createHtmlPlugin } from vite-plugin-htmlexpo…

Echarts 实现两两柱图重叠(背景和实际值柱图)

Echarts实现两两重叠柱状图_echarts 重叠柱状图_Web_阿凯的博客-CSDN博客 引用启发的博客 先来效果&#xff1a; option {backgroundColor: #03213D,animation: true, // 控制动画是否开启animationDuration: 1000, // 动画的时长, 它是以毫秒为单位animationDuration: func…