Kafka入门二——SpringBoot连接Kafka示例

实现

1.引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-demo</name><description>kafka-demo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><image><builder>paketobuildpacks/builder-jammy-base:latest</builder></image></configuration></plugin></plugins></build></project>

2.修改application.properties配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=10.20.37.85:9092
spring.kafka.consumer.group-id=demo

通过spring.kafka.consumer.auto-offset-reset配置可以指定Kafka消费者的行为,当遇到没有初始偏移量或当前偏移量不再服务器上存在的情况时的处理策略。这个配置有以下三个可选值:

  • earliest:当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最早的可用消息开始消费。
  • latest(默认):当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最新的可用消息开始消费。
  • none:当服务器上找不到消费者的偏移量时,抛出异常。

通过spring.kafka.bootstrap-servers配置可以指定Kafka的地址。这个配置的值应该是一个或多个Kafka服务器的地址,用逗号分隔。

通过spring.kafka.consumer.group-id配置可以指定Kafka消费者的组ID。这个配置的值应该是一个字符串,用于标识消费者所属的组。

定义Configuer引入kafka Bean

@Component
public class MyConfiguer {@Beanpublic NewTopic topic() {return TopicBuilder.name("topic1").partitions(10).replicas(1).build();}
}

使用了@Bean注解,表示该方法将返回一个Spring管理的bean对象。在这个方法中,通过调用TopicBuilder类的name方法指定了主题的名称为"topic1",然后使用partitions方法设置了主题的分区数为10,使用replicas方法设置了主题的副本数为1。最后,调用build方法构建并返回了一个NewTopic对象。

定义生产者

@Component
public class KafkaProducer {@Beanpublic ApplicationRunner runner(KafkaTemplate<String, String> template) {return args -> {template.send("topic1", "test");};}
}

ApplicationRunner接口是Spring Boot提供的一个回调接口,用于在应用程序启动后执行一些操作。这段代码的作用是在应用程序启动后自动发送一条消息到Kafka的"topic1"主题,消息内容为"test"。

定义消费者

@Component
public class KafkaConsumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

@KafkaListener注解用于标记方法作为Kafka消息的消费者。这段代码的作用是创建一个Kafka消费者,监听名为"topic1"的主题,并将接收到的消息内容打印到控制台。

总结

生产者

Kafka生产者发送消息的流程如下:

  • 创建ProducerRecord:首先,生产者需要创建一个ProducerRecord对象,该对象包含目标主题和要发送的内容。如果需要,还可以指定消息的键(key)或分区(partition)。

  • 序列化:生产者将ProducerRecord对象中的键和值进行序列化,转换为字节数组,以便能够在网络上传输。

  • 拦截器链:消息会通过一个由一个或多个拦截器组成的链。这些拦截器可以对消息进行预处理,例如添加头信息或实施自定义的逻辑。

  • 元数据加载:生产者在发送消息之前,需要加载Kafka集群的元数据,以确定各个主题的分区和副本分布情况。

  • 计算分区号:根据ProducerRecord中的键和分区策略(如果有的话),生产者会计算出消息应该发送到哪个分区。

  • 累加器缓存:消息被缓存到RecordAccumulator累加器中,这是一个按照批次处理消息的组件。

  • Sender线程发送:Sender线程负责将符合条件的消息批次发送给Kafka broker。除了发送消息外,Sender线程也负责更新元数据。

  • 等待确认:生产者可以选择等待broker的确认,以确保消息已经被成功接收并写入日志。

  • 处理响应:生产者处理来自broker的响应,包括错误处理和重试逻辑。

  • 完成发送:一旦消息被成功发送并且确认,生产者会继续发送下一批消息或者结束发送过程。
    在这里插入图片描述

消费者

Kafka消费者接收消息的流程涉及到多个关键步骤,确保了消息能够从broker高效地传送到消费者手中。以下是该过程的详细描述:

  • 创建消费者实例:需要创建一个Kafka消费者实例,这通常涉及到指定一些关键参数,如Kafka集群的地址、消费者组ID、反序列化器等。

  • 订阅主题:消费者需要订阅一个或多个感兴趣的主题。这意味着消费者希望从这些主题接收消息。

  • 协调消费者组:在ConsumerCoordinator的poll方法中,会有一个ensureActiveGroup的过程,消费者通过这个过程向Coordinator发送加入消费者组的请求,并等待Coordinator的响应。

  • 拉取消息:一旦加入了消费者组,消费者开始从订阅的主题中拉取(pull)消息。Kafka采用pull模式而不是push模式,这意味着消费者需要主动去服务器拉取消息。

  • 处理消息:消费者获取消息后,会进行相应的处理。这个处理过程可能包括数据转换、存储或者其他业务逻辑。

  • 提交消费位移:消费者在处理完消息后,需要提交消费位移。这是告诉Kafka它已经消费到了哪个位置,以便下次从正确的位置开始消费。位移的提交是由消费者自己管理的,Kafka提供了接口来更新这些位移。

  • 控制消费速率:消费者可以通过各种方式控制消息的消费速率,例如通过限制拉取操作的频率或者调整消费者的线程数。

  • 错误处理和重试:在消费过程中可能会遇到错误,比如网络问题或者消息格式错误。消费者需要有相应的错误处理机制来处理这些情况,并在必要时进行重试。

  • 关闭消费者:完成消息消费后,应当关闭消费者实例,释放资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot+vue的学科平台系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

轻量级模型,重量级性能,TinyLlama、LiteLlama小模型火起来了,针对特定领域较小的语言模型是否与较大的模型同样有效?

轻量级模型&#xff0c;重量级性能&#xff0c;TinyLlama、LiteLlama小模型火起来了&#xff0c;针对特定领域较小的语言模型是否与较大的模型同样有效? 当大家都在研究大模型&#xff08;LLM&#xff09;参数规模达到百亿甚至千亿级别的同时&#xff0c;小巧且兼具高性能的小…

Linux环境基础开发工具使用篇(三) git 与 gdb

一、版本控制器-git 1.简单理解: ①git既是服务端&#xff0c;又是客户端 ②git会记录版本的变化 ③git是一个去中心化的分布式软件 git/gitee 是基于git仓库搭建的网站&#xff0c;让版本管理可视化 2.git 三板斧提交代码 查看安装的git版本 git--version 命令行提交代…

FPGA IO命名与Bank划分

文章目录 IO的命名IO物理命名IO功能命名 Bank简介FPGA器件功能命名与Bank划分查找XILINXIntelLATTICE IO的命名 IO物理命名 FPGA的IO物理命名规则&#xff0c;也就是我们做管脚约束时候的命名。芯片通常是长方体或者正方体&#xff0c;所以命名通常采用字母数字组合的方式&am…

FMM 笔记:st-matching(colab上执行)【官方案例解读】

在colab上运行&#xff0c;所以如何在colab上安装fmm&#xff0c;可见FMM 笔记&#xff1a;在colab上执行FMM-CSDN博客 st-matching见论文笔记&#xff1a;Map-Matching for low-sampling-rate GPS trajectories&#xff08;ST-matching&#xff09;-CSDN博客 0 导入库 from…

华为畅享 60X 到底值得入手吗?这4点你必须要知道

作为一款主打千元机市场的机型&#xff0c;华为畅享 60X 到底怎么样&#xff1f;是否值得入手&#xff1f; 可以负责任的说华为畅享 60X 是一款性价比超高的手机&#xff0c;凭借其出色的硬件配置和适中的价格&#xff0c;不仅拥有华为完整的鸿蒙生态&#xff0c;同时它超大屏幕…

【DL】深度学习之语音识别

目录 1 核心概念 2 安装依赖库 3 实践 语音信号处理&#xff08;Speech Signal Processing&#xff09;简称语音处理。 语音识别&#xff08;ASR&#xff09;和自然语言处理&#xff08;NLP&#xff09;&#xff1a;语音识别就是将语音信号转化成文字文本&#xff0c;简单实…

Spring-Cloud-Gateway集成Sentinel限流

1&#xff09;gateway添加sentinel相关依赖 <spring-cloud.version>2021.0.1</spring-cloud.version> <spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version><dependencies><!--gateway--><dependency><gro…

面试redis篇-11Redis集群方案-哨兵

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下: 监控:Sentinel 会不断检查您的master和slave是否按预期工作自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主通知:Sentinel充当…

POST参数里加号+变成空格的问题处理

今天遇到个这样的问题&#xff0c;从前端传到后端的加密报文&#xff0c;里面包含了号&#xff0c;但在后端日志输出看出&#xff0c;变成空格。这个是由于经过RSA加密后引起的 解决办法&#xff1a; 1.前端转码&#xff1a;使用encodeURIComponent对参数进行转码 2.后端解码…

【初始RabbitMQ】高级发布确认的实现

在生产环境中由于一些不明原因&#xff0c;导致 rabbitmq 重启&#xff0c;在 RabbitMQ 重启期间生产者消息投递失败&#xff0c; 导致消息丢失&#xff0c;需要手动处理和恢复。于是&#xff0c;我们开始思考&#xff0c;如何才能进行 RabbitMQ 的消息可靠投递呢&#xff1f; …

python Matplotlib Tkinter-->tab切换2

环境 python:python-3.12.0-amd64 包: matplotlib 3.8.2 pillow 10.1.0 import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk import tkinter as tk import tkinter.ttk as ttk# 创建自定义工具栏类 c…

面试redis篇-12Redis集群方案-分片集群

原理 主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决&#xff1a; 海量数据存储问题高并发写的问题 使用分片集群可以解决上述问题&#xff0c;分片集群特征&#xff1a; 集群中有多个master&#xff0c;每个master保存不同数据每个master都可以有…

10 Redis之SB整合Redis

7. SB整合Redis Spring Boot 中可以直接使用 Jedis 实现对 Redis 的操作&#xff0c;但一般不这样用&#xff0c;而是使用 Redis操作模板 RedisTemplate 类的实例来操作 Redis。 RedisTemplate 类是一个对 Redis 进行操作的模板类。该模板类中具有很多方法&#xff0c;这些方…

【手机端测试】adb基础命令

一、什么是adb adb&#xff08;Android Debug Bridge&#xff09;是android sdk的一个工具 adb是用来连接安卓手机和PC端的桥梁&#xff0c;要有adb作为二者之间的维系&#xff0c;才能让用户在电脑上对手机进行全面的操作。 Android的初衷是用adb这样的一个工具来协助开发人…

el-submenu is-opened 展开/闭合;el-submenu is-opened保持一个子菜单的展开控制

写了个mes系统目录 点击子菜单展开后&#xff0c;上一级菜单没有默认关闭。主流后台管理系统大部分都是保持一个子菜单关闭状态、 问度娘无果后&#xff0c;查询官网&#xff0c;一个属性搞定。 unique-opened 是否只保持一个子菜单的展开 加在 <el-menu 组件上即可 完整代…

LeetCode_Java_动态规划系列(1)(题目+思路+代码)

目录 斐波那契类型 746.使用最小花费爬楼梯 矩阵 120. 三角形最小路径和 斐波那契类型 746.使用最小花费爬楼梯 给你一个整数数组 cost &#xff0c;其中 cost[i] 是从楼梯第 i 个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。…

Leetcode202. 快乐数中为什么一定会循环?

他提示中是2的31次方&#xff1a; 相当于我就当它10个9&#xff0c;那么通过一次变换变成了81*10 810&#xff0c;所以他以后最后的范围也只能在[0,810]之间&#xff0c;810是最大的&#xff0c;然后任意一个数x&#xff0c;通过810次变换都不重复&#xff0c;那么811次变换那…

GIS之深度学习01:检测电脑是否包含英伟达GPU

GPU&#xff08;Graphics processing unit&#xff09;&#xff0c;中文全称图形处理器&#xff0c;我们听说的更多的CPU全称是central processing unit&#xff0c;中央处理器。研究深度学习和神经网络大都离不开GPU&#xff0c;在GPU的加持下&#xff0c;我们可以更快的获得模…

【QT+QGIS跨平台编译】之五十二:【QGIS_CORE跨平台编译】—【qgsexpressionlexer.cpp生成】

文章目录 一、Flex二、生成来源三、构建过程一、Flex Flex (fast lexical analyser generator) 是 Lex 的另一个替代品。它经常和自由软件 Bison 语法分析器生成器 一起使用。Flex 最初由 Vern Paxson 于 1987 年用 C 语言写成。 “flex 是一个生成扫描器的工具,能够识别文本中…