kafka学习-概念与简单实战

目录

1、核心概念

消息和批次

Topic和Partition

Replicas

Offset

broker和集群

生产者和消费者

2、开发实战

2.1、消息发送

介绍

代码实现

2.2、消息消费

介绍

代码实现

2.3、SpringBoot Kafka

pom

application.yaml

KafkaConfig

producer

consumer


1、核心概念

消息和批次

        kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。

        批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。

Topic和Partition

        topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。

        partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)

        主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。

        主题的分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

Replicas

  1. 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
  2. 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
  3. 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
  4. 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。

Offset

        偏移量。

        生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。

        消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。

broker和集群

broker为一个独立的kafka服务器;一个kafka集群里有多个broker。

        broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。

        集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。

        每个集群都有一个broker是集群控制器,作用如下:

  1. 将分区分配给首领分区的broker;
  2. 监控broker,首领分区切换

生产者和消费者

        生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

        消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。

2、开发实战

2.1、消息发送

介绍

  • 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
  • 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。        

代码实现

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {Map<String, Object> configs = new HashMap<>();// 设置连接Kafka的初始连接⽤到的服务器地址// 如果是集群,则可以通过此初始连接发现集群中的其他brokerconfigs.put("bootstrap.servers", "node1:9092");// 设置key和value的序列化器configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于封装Producer的消息ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", // 主题名称0, // 分区编号,现在只有⼀个分区,所以是00, // 数字作为key"message 0" // 字符串作为value);// 发送消息,同步等待消息的确认// producer.send(record).get(3_000, TimeUnit.MILLISECONDS);// 使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题:" + metadata.topic() + "\n"+ "分区:" + metadata.partition() + "\n"+ "偏移量:" + metadata.offset() + "\n"+ "序列化的key字节:" + metadata.serializedKeySize() + "\n"+ "序列化的value字节:" + metadata.serializedValueSize() + "\n"+ "时间戳:" + metadata.timestamp());} else {System.out.println("有异常:" + exception.getMessage());}}});// 关闭连接producer.close();
}

2.2、消息消费

介绍

        消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

代码实现

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。configs.put("bootstrap.servers", "node1:9092");// key和value的反序列化器configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "consumer.demo");// 创建消费者对象KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);final Pattern pattern = Pattern.compile("topic_[0-9]");// 消费者订阅主题或分区// consumer.subscribe(pattern);// consumer.subscribe(pattern, new ConsumerRebalanceListener() {final List<String> topics = Arrays.asList("topic_1");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println("剥夺的分区:" + tp.partition());});	}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println(tp.partition());});}});// 拉取订阅主题的消息final ConsumerRecords<Integer, String> records = consumer.poll(3_000);// 获取topic_1主题的消息final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");// 遍历topic_1主题的消息topic1Iterable.forEach(record -> {System.out.println("========================================");System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的值:" + record.value());System.out.println("消息的主题:" + record.topic());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的偏移量:" + record.offset());});// 关闭消费者consumer.close();
}

2.3、SpringBoot Kafka

pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

application.yaml

spring:kafka:bootstrap-servers: node1:9092       # 用于建立初始连接的broker地址producer:key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384                 # 默认的批处理记录数buffer-memory: 33554432           # 32MB的总发送缓存consumer:key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: spring-kafka-02-consumer    # consumer的消费组idenable-auto-commit: true              # 是否自动提交消费者偏移量auto-commit-interval: 100             # 每隔100ms向broker提交一次偏移量auto-offset-reset: earliest           # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量

KafkaConfig

@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("ntp-01", 5, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("ntp-02", 3, (short) 1);}
}

producer

@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate template;@RequestMapping("send/sync/{message}")public String sendSync(@PathVariable String message) {ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));try {// 同步等待broker的响应Object o = future.get();SendResult<Integer, String> result = (SendResult<Integer, String>) o;System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}
}@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String asyncSend(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);ListenableFuture<SendResult<Integer, String>> future = template.send(record);// 添加回调,异步等待响应future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送失败: " + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());}});return "success";}
}

consumer

@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-02")public void onMessage(ConsumerRecord<Integer, String> record) {Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);if (optional.isPresent()) {System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());}}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring MVC】统一功能处理

一、登录验证 登录验证通过拦截器实现&#xff0c;拦截器就是在用户访问服务器时&#xff0c;预先拦截检查一下用户的访问请求。 没有拦截器时&#xff0c;用户访问服务器的流程是&#xff1a;用户–>controller–>service–>Mapper。有拦截器时&#xff0c;用户访问…

RouterOS-配置PPPoEv4v6 Server

1 接口 ether3 出接口 ether4 内网接口 2 出接口 出接口采用PPPoE拨号SLAAC获取前缀&#xff0c;手动配置后缀 2.1 选择出接口interface&#xff0c;配置PPPoE client模式 2.2 配置PPPoE client用户名和密码 2.3 从PPPoE client获取前缀地址池 2.4 给出接口选择前缀并配置…

第10章_索引优化与查询优化(覆盖索引, 索引下推等)

4. 子查询优化 MySQL 从 4.1 版本开始支持子查询&#xff0c;使用子查询可以进行 SELECT 语句的嵌套查询&#xff0c;即一个 SELECT 查询的结果作为另一个SELECT 语句的条件。 子查询可以一次性完成很多逻辑上需要多个步骤才能完成的 SQL 操作 。 子查询是 MySQL 的一项重…

Vue + Element UI 前端篇(七):功能组件封装

组件封装 为了避免组件代码的臃肿&#xff0c;这里对主要的功能部件进行封装&#xff0c;保证代码的模块化和简洁度。 组件结构 组件封装重构后&#xff0c;试图组件结构如下图所示 代码一览 Home组件被简化&#xff0c;包含导航、头部和主内容三个组件。 Home.vue <te…

Qt 常用函数

设置编码 #if (QT_VERSION < QT_VERSION_CHECK(5,0,0)) #if _MSC_VERQTextCodec *codec QTextCodec::codecForName("gbk"); #elseQTextCodec *codec QTextCodec::codecForName("utf-8"); #endifQTextCodec::setCodecForLocale(codec);QTextCodec::se…

vue优化首屏加载时间优化-cdn引入第三方包

前言 为什么要进行首屏加载优化&#xff0c;因为随着我们静态资源和第三方包和代码增加&#xff0c;压缩之后包会越来越大 随着网络的影响&#xff0c;在我们第一输入url请求资源时候&#xff0c;网络阻塞&#xff0c;加载时间长&#xff0c;用户体验不好 仔细观察后就会发现…

YOLOV8实例分割——详细记录环境配置、自定义数据处理到模型训练与部署

前言 Ultralytics YOLOv8是一种前沿的、最先进的&#xff08;SOTA&#xff09;模型&#xff0c;它在前代YOLO版本的成功基础上进行了进一步的创新&#xff0c;引入了全新的特性和改进&#xff0c;以进一步提升性能和灵活性。作为一个高速、精准且易于操作的设计&#xff0c;YO…

用Jmeter压测问题解决

最近做一个基于duboo服务的接口&#xff0c;需要进行稳定性测试。但是用Jmeter GUI 方式跑只能持续2个小时左右&#xff0c;Jmeter就崩溃了&#xff0c;日志报错&#xff1a;out of memory 解决方法如下&#xff1a; 直接运行jmeter的java包试试&#xff1a; 1、打开jmeter.…

【计算机网络】http协议

目录 前言 认识URL URLEncode和URLDecode http协议格式 http方法 GET POST GET与POST的区别 http状态码 http常见header 简易的http服务器 前言 我们在序列化和反序列化这一章中&#xff0c;实现了一个网络版的计算器。这个里面设计到了对协议的分析与处…

JVM监控和调优常用命令jps|jstat|jinfo|jmap|jhat|jstack实战

1.JVM监控和调优的主要目的 性能优化:通过JVM调优,可以提高Java应用程序的性能,减少响应时间,提高吞吐量,以更好地满足用户需求。性能优化可以加快应用程序的执行速度,减少延迟,提高用户体验。 内存管理:JVM负责管理Java应用程序的内存。正确的内存管理可以避免内存泄漏…

CKEditor5定制及文件上传

CKEditor4已从2023年6月开始停止支持&#xff0c;所以最好还是升级到CKEditor5。CKEditor5在使用上与CKEditor4在使用层面上还是有很大的不同&#xff0c;首先&#xff0c;CKEditor4完全可以下载包含了所有功能的full包&#xff0c;通过配置选择需要的功能&#xff0c;只有希望…

人们对区块链的认识开始变得深入和完善,另一条新路径开始衍生

当区块链行业的发展进入到深水区&#xff0c;特别是当有关区块链的狂热与躁动开始退场&#xff0c;仅仅只是主打区块链的概念&#xff0c;而没有找到区块链与现实商业联通的方式和方法&#xff0c;依然成为困扰区块链发展的一大症结。   事实上&#xff0c;从区块链被人们认识…

STL常用容器 (C++核心基础教程之STL容器详解)String的API

在C的标准模板库&#xff08;STL&#xff09;中&#xff0c;有多种容器可供使用。以下是一些常见的容器类型&#xff1a; 序列容器&#xff08;Sequential Containers&#xff09;&#xff1a; std::vector&#xff1a;动态数组&#xff0c;支持快速随机访问。 std::list&…

宠物电商Chewy第二季度销售额28亿美元,同比增长14.3%

美国宠物电商Chewy公布2023年第二季度财报。报告显示&#xff0c;其Q2季度销售额同比增长14.3%至28亿美元&#xff0c;超出市场预期。 以下为Chewy期内业绩概要&#xff1a; 1.毛利率28.3%&#xff0c;同比增长20个基点 2.净利润有所收窄&#xff0c;同比下降15.2%至1890万美…

9.2.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-深度估计

目录 前言1. 深度估计总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-深度估…

备战面试每日一题

1.如何理解this&#xff1f; this表示的是函数运行时自动生成的一个内部对象&#xff0c;只能在函数内部使用&#xff0c;总是指向调用它的对象。 this是在运行时进行绑定的&#xff0c;并不是在编写的时候绑定&#xff0c;它的上下文取决于函数调用时的各种条件。this的绑定…

Mybatis学习|Mybatis缓存:一级缓存、二级缓存

Mybatis缓存 MyBatis包含一个非常强大的查询缓存特性&#xff0c;它可以非常方便地定制和配置缓存。缓存可以极大的提升查询效率。 MyBatis系统中默认定义了两级缓存:一级缓存和二级缓存 默认情况下&#xff0c;只有一级缓存开启。(SqlSession级别的缓存&#xff0c;也称为本地…

Python自动检测西北政法大学研究生院网站硕士招生新闻并发送邮件

因为近期需要时刻查看该网页的最新公布消息&#xff0c;所以使用python自动访问网页消息并通过邮件通知。 官网链接&#xff1a;硕士招生 - 西北政法大学研究生院 工具&#xff1a;python&#xff0c;官网下载python安装即可 插件安装&#xff1a; pip install BeautifulSou…

nginx实现负载均衡load balance

目录 nginx实现负载均衡load balance相关算法负载均衡https的访问后端的real server是否知道真正访问的用户的IP地址健康检查提升负载均衡的并发数量七层负载均衡和四层负载均衡七层负载均衡四层负载均衡四层和七层的区别502错误 nginx实现负载均衡load balance 准备&#xff…

Leetcode 1572.矩阵对角线元素之和

给你一个正方形矩阵 mat&#xff0c;请你返回矩阵对角线元素的和。 请你返回在矩阵主对角线上的元素和副对角线上且不在主对角线上元素的和。 示例 1&#xff1a; 输入&#xff1a;mat [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;25 解释&#xff1a;对角线的和为&#xff…