Kafka入门二——SpringBoot连接Kafka示例

实现

1.引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-demo</name><description>kafka-demo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><image><builder>paketobuildpacks/builder-jammy-base:latest</builder></image></configuration></plugin></plugins></build></project>

2.修改application.properties配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=10.20.37.85:9092
spring.kafka.consumer.group-id=demo

通过spring.kafka.consumer.auto-offset-reset配置可以指定Kafka消费者的行为,当遇到没有初始偏移量或当前偏移量不再服务器上存在的情况时的处理策略。这个配置有以下三个可选值:

  • earliest:当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最早的可用消息开始消费。
  • latest(默认):当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最新的可用消息开始消费。
  • none:当服务器上找不到消费者的偏移量时,抛出异常。

通过spring.kafka.bootstrap-servers配置可以指定Kafka的地址。这个配置的值应该是一个或多个Kafka服务器的地址,用逗号分隔。

通过spring.kafka.consumer.group-id配置可以指定Kafka消费者的组ID。这个配置的值应该是一个字符串,用于标识消费者所属的组。

定义Configuer引入kafka Bean

@Component
public class MyConfiguer {@Beanpublic NewTopic topic() {return TopicBuilder.name("topic1").partitions(10).replicas(1).build();}
}

使用了@Bean注解,表示该方法将返回一个Spring管理的bean对象。在这个方法中,通过调用TopicBuilder类的name方法指定了主题的名称为"topic1",然后使用partitions方法设置了主题的分区数为10,使用replicas方法设置了主题的副本数为1。最后,调用build方法构建并返回了一个NewTopic对象。

定义生产者

@Component
public class KafkaProducer {@Beanpublic ApplicationRunner runner(KafkaTemplate<String, String> template) {return args -> {template.send("topic1", "test");};}
}

ApplicationRunner接口是Spring Boot提供的一个回调接口,用于在应用程序启动后执行一些操作。这段代码的作用是在应用程序启动后自动发送一条消息到Kafka的"topic1"主题,消息内容为"test"。

定义消费者

@Component
public class KafkaConsumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

@KafkaListener注解用于标记方法作为Kafka消息的消费者。这段代码的作用是创建一个Kafka消费者,监听名为"topic1"的主题,并将接收到的消息内容打印到控制台。

总结

生产者

Kafka生产者发送消息的流程如下:

  • 创建ProducerRecord:首先,生产者需要创建一个ProducerRecord对象,该对象包含目标主题和要发送的内容。如果需要,还可以指定消息的键(key)或分区(partition)。

  • 序列化:生产者将ProducerRecord对象中的键和值进行序列化,转换为字节数组,以便能够在网络上传输。

  • 拦截器链:消息会通过一个由一个或多个拦截器组成的链。这些拦截器可以对消息进行预处理,例如添加头信息或实施自定义的逻辑。

  • 元数据加载:生产者在发送消息之前,需要加载Kafka集群的元数据,以确定各个主题的分区和副本分布情况。

  • 计算分区号:根据ProducerRecord中的键和分区策略(如果有的话),生产者会计算出消息应该发送到哪个分区。

  • 累加器缓存:消息被缓存到RecordAccumulator累加器中,这是一个按照批次处理消息的组件。

  • Sender线程发送:Sender线程负责将符合条件的消息批次发送给Kafka broker。除了发送消息外,Sender线程也负责更新元数据。

  • 等待确认:生产者可以选择等待broker的确认,以确保消息已经被成功接收并写入日志。

  • 处理响应:生产者处理来自broker的响应,包括错误处理和重试逻辑。

  • 完成发送:一旦消息被成功发送并且确认,生产者会继续发送下一批消息或者结束发送过程。
    在这里插入图片描述

消费者

Kafka消费者接收消息的流程涉及到多个关键步骤,确保了消息能够从broker高效地传送到消费者手中。以下是该过程的详细描述:

  • 创建消费者实例:需要创建一个Kafka消费者实例,这通常涉及到指定一些关键参数,如Kafka集群的地址、消费者组ID、反序列化器等。

  • 订阅主题:消费者需要订阅一个或多个感兴趣的主题。这意味着消费者希望从这些主题接收消息。

  • 协调消费者组:在ConsumerCoordinator的poll方法中,会有一个ensureActiveGroup的过程,消费者通过这个过程向Coordinator发送加入消费者组的请求,并等待Coordinator的响应。

  • 拉取消息:一旦加入了消费者组,消费者开始从订阅的主题中拉取(pull)消息。Kafka采用pull模式而不是push模式,这意味着消费者需要主动去服务器拉取消息。

  • 处理消息:消费者获取消息后,会进行相应的处理。这个处理过程可能包括数据转换、存储或者其他业务逻辑。

  • 提交消费位移:消费者在处理完消息后,需要提交消费位移。这是告诉Kafka它已经消费到了哪个位置,以便下次从正确的位置开始消费。位移的提交是由消费者自己管理的,Kafka提供了接口来更新这些位移。

  • 控制消费速率:消费者可以通过各种方式控制消息的消费速率,例如通过限制拉取操作的频率或者调整消费者的线程数。

  • 错误处理和重试:在消费过程中可能会遇到错误,比如网络问题或者消息格式错误。消费者需要有相应的错误处理机制来处理这些情况,并在必要时进行重试。

  • 关闭消费者:完成消息消费后,应当关闭消费者实例,释放资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2-25算法习题总结

贪心问题 P1803 凌乱的yyy / 线段覆盖 凌乱的yyy / 线段覆盖 题目背景 快 noip 了&#xff0c;yyy 很紧张&#xff01; 题目描述 现在各大 oj 上有 n n n 个比赛&#xff0c;每个比赛的开始、结束的时间点是知道的。 yyy 认为&#xff0c;参加越多的比赛&#xff0c;no…

基于springboot+vue的学科平台系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

代码随想录三刷day13

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣151. 反转字符串中的单词二、力扣55. 右旋字符串&#xff08;第八期模拟笔试&#xff09;三、力扣28. 找出字符串中第一个匹配项的下标 前言 KMP主要应…

轻量级模型,重量级性能,TinyLlama、LiteLlama小模型火起来了,针对特定领域较小的语言模型是否与较大的模型同样有效?

轻量级模型&#xff0c;重量级性能&#xff0c;TinyLlama、LiteLlama小模型火起来了&#xff0c;针对特定领域较小的语言模型是否与较大的模型同样有效? 当大家都在研究大模型&#xff08;LLM&#xff09;参数规模达到百亿甚至千亿级别的同时&#xff0c;小巧且兼具高性能的小…

Mysql8.0 字符集

在8.0版本之前&#xff0c;MySQL默认的字符集为latin1&#xff0c;而8.0版本默认的字符集为utf8mb4。 latin1是ISO-8859-1的别名&#xff0c;有些环境下写作latin-1。ISO-8859-1编码是单字节编码&#xff0c;不支持中文等多字节字符&#xff0c;但向下兼容ASCII&#xff0c;其编…

学生信息的那些操作:(3)按姓名,查个人

有一学生成绩表&#xff0c;包括学号、姓名、3门课程成绩。请实现如下查找功能&#xff1a;输入一个学生的姓名&#xff0c;输出该学生学号、姓名、3门课程成绩 输入格式: 首先输入一个整数n(1<n<100)&#xff0c;表示学生人数&#xff1b; 然后输入n行&#xff0c;每…

关于CSS 盒子模型的基础教程

什么是CSS盒子模型&#xff1f; 在学习CSS布局时&#xff0c;一个非常重要的概念就是盒子模型。CSS盒子模型描述了网页中元素的布局方式&#xff0c;每个元素都被看作一个矩形的盒子&#xff0c;这个盒子包含了内容、内边距、边框和外边距四个部分。 盒子模型的组成部分 盒子…

Linux环境基础开发工具使用篇(三) git 与 gdb

一、版本控制器-git 1.简单理解: ①git既是服务端&#xff0c;又是客户端 ②git会记录版本的变化 ③git是一个去中心化的分布式软件 git/gitee 是基于git仓库搭建的网站&#xff0c;让版本管理可视化 2.git 三板斧提交代码 查看安装的git版本 git--version 命令行提交代…

FPGA IO命名与Bank划分

文章目录 IO的命名IO物理命名IO功能命名 Bank简介FPGA器件功能命名与Bank划分查找XILINXIntelLATTICE IO的命名 IO物理命名 FPGA的IO物理命名规则&#xff0c;也就是我们做管脚约束时候的命名。芯片通常是长方体或者正方体&#xff0c;所以命名通常采用字母数字组合的方式&am…

FMM 笔记:st-matching(colab上执行)【官方案例解读】

在colab上运行&#xff0c;所以如何在colab上安装fmm&#xff0c;可见FMM 笔记&#xff1a;在colab上执行FMM-CSDN博客 st-matching见论文笔记&#xff1a;Map-Matching for low-sampling-rate GPS trajectories&#xff08;ST-matching&#xff09;-CSDN博客 0 导入库 from…

华为畅享 60X 到底值得入手吗?这4点你必须要知道

作为一款主打千元机市场的机型&#xff0c;华为畅享 60X 到底怎么样&#xff1f;是否值得入手&#xff1f; 可以负责任的说华为畅享 60X 是一款性价比超高的手机&#xff0c;凭借其出色的硬件配置和适中的价格&#xff0c;不仅拥有华为完整的鸿蒙生态&#xff0c;同时它超大屏幕…

电源轨概念讲解

目录 1、电源轨定义2、模拟运放中电源轨概念3、芯片中电源轨概念 在电子设计中&#xff0c;我们经常会听到电源轨的概念&#xff0c;下面就针对他的定义和模电中的习惯叫法做一个简单的讲解&#xff1a; 1、电源轨定义 电源轨是指电路板上传输电力的线路&#xff0c;只要是连接…

【DL】深度学习之语音识别

目录 1 核心概念 2 安装依赖库 3 实践 语音信号处理&#xff08;Speech Signal Processing&#xff09;简称语音处理。 语音识别&#xff08;ASR&#xff09;和自然语言处理&#xff08;NLP&#xff09;&#xff1a;语音识别就是将语音信号转化成文字文本&#xff0c;简单实…

go 解压和压缩包

將压缩包放在zippath"D:/xx/xx/xx"中,解压到pathto"D:/xx/xx1/xx"中 type UploaddeployLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContextr *http.Request }func NewUploaddeployLogic(r *http.Request, svcCtx *svc.Serv…

Spring-Cloud-Gateway集成Sentinel限流

1&#xff09;gateway添加sentinel相关依赖 <spring-cloud.version>2021.0.1</spring-cloud.version> <spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version><dependencies><!--gateway--><dependency><gro…

Linux yum与rpm区别

yum和rpm都是Linux系统中用于安装、升级和管理软件包的工具&#xff0c;但它们有一些区别。以下是yum和rpm的主要区别&#xff1a; 1. 功能&#xff1a;rpm是一种软件包管理工具&#xff0c;用于安装、升级和卸载软件包。它可以直接操作软件包文件&#xff0c;但不提供依赖关系…

JQUERY简介与分析

在现代的前端开发中&#xff0c;jQuery无疑是一个非常重要且广泛使用的工具库。它不仅简化了JavaScript的编写&#xff0c;还提供了丰富的功能和强大的选择器&#xff0c;使开发者能够更轻松地操作DOM元素、处理事件和实现动态效果。 简单来说&#xff0c;jQuery是一个快速、简…

【主流开发语言和开发环境介绍】

主流开发语言和开发环境介绍 1. 介绍2. 开发语言3. 开发环境 1. 介绍 下面是一些广泛使用的主流开发语言及其相关的开发环境。 2. 开发语言 Python 用途&#xff1a;通用编程、科学计算、数据分析、机器学习、Web开发等。流行库&#xff1a;NumPy, Pandas, TensorFlow, PyTor…

JavaWeb开发初体验

1.动态网站 动态网站可根据不同情况动态变更的网站&#xff0c;动态网站的网页文件包含程序代码&#xff0c;通过后台数据库与web服务器信息交互&#xff0c;由后台数据库提供实时数据更新和数据查询服务。 2.动态网站的功能特点 动态网站可以实现交互功能&#xff0c;…

面试redis篇-11Redis集群方案-哨兵

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下: 监控:Sentinel 会不断检查您的master和slave是否按预期工作自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主通知:Sentinel充当…