kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/23417.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx动静分离和反向代理

一、动静分离 动静分离指的是将动态内容和静态内容分开处理。动态内容通常由后端应用程序生成&#xff0c;例如PHP、Python或Node.js&#xff0c;静态内容则包括图片、CSS、JavaScript等文件。 例子&#xff1a; #代理服务器一 server{listen 80;server_name www.dj.com;r…

夏季高温来袭|危化品如何安全储存?

《危险化学品安全管理条例》第三条 本条例所称危险化学品&#xff0c;是指具有毒害、腐蚀、爆炸、燃烧、助燃等性质&#xff0c;对人体、设施、环境具有危害的剧毒化学品和其他化学品。 随着夏天高温的来袭&#xff0c;炎热的天气对危化品储存威胁巨大&#xff0c;危化品事故也…

【C++课程学习】:C++入门(输入输出,缺省参数)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 &#x1f369;1.关于C输入输出&#xff1a; &#x1f369;2.缺省参数函数&#xff1a; 缺省参数的概…

聊聊大模型微调训练全流程的思考

前言 参考现有的中文医疗模型&#xff1a;MedicalGPT、CareGPT等领域模型的训练流程&#xff0c;结合ChatGPT的训练流程&#xff0c;总结如下&#xff1a; 在预训练阶段&#xff0c;模型会从大量无标注文本数据集中学习领域/通用知识&#xff1b;其次使用{有监督微调}(SFT)优化…

TensorBoard在pytorch训练过程中如何使用,及数据读取问题解决方法

TensorBoard 模块导入日志记录文件的创建训练中如何写入数据如何提取保存的数据调用TensorBoard面板可能会遇到的问题 模块导入 首先从torch中导入tensorboard的SummaryWriter日志记录模块 from torch.utils.tensorboard import SummaryWriter然后导入要用到的os库&#xff0…

方案设计|汽车轮胎数显胎压计方案

一、引言 数显轮胎胎压计是一个专门测量车辆轮胎气压的工具&#xff0c;它具有高精度测量的功能&#xff0c;能够帮助快速准确获取轮胎气压正确数值&#xff0c;保证轮胎使用安全。本文将对数显轮胎胎压计的方案技术进行分析&#xff0c;包括其基本原理、硬件构成、软件设计等方…

架构学习:什么是业务架构图?如何画业务架构图?

6.1~6.18&#xff0c;艾威618年中大促&#xff0c;钜惠来袭&#xff01;想报课但还没下手的小伙伴&#xff0c;都可以行动起来啦&#xff01;活动规则还是一如既往的简单、粗暴——直接立减、返现、抽奖以及送礼品&#xff01;了解活动详情&#xff0c;请点击这里》》 业务架构…

实验9 浮动静态路由配置

--名称-- 一、 原理描述二、 实验目的三、 实验内容四、 实验配置五、 实验步骤 一、 原理描述 浮动静态路由也是一种特殊的静态路由&#xff0c;主要考虑链路冗余。浮动静态路由通过配置一条比主路由优先级低的静态路由&#xff0c;用于保证在主路由失效的情况下&#xff0c;…

代码随想录 day27|day28|day29

回溯2 切割问题&#xff1a;是在每个节点判断是否是要剪枝收割元素。 startidx 是切割起点&#xff0c;i是本次切割终点 分割回文串 复原ip地址 非递减子序列 都是在树的节点依照题意判断&#xff0c;之后决定是否剪枝。 也就是都有if判断来剪枝 。 下面是非递减子序列。 下…

前端将xlsx转成json

第一种方式&#xff0c;用js方式 1.1先安装插件 万事都离不开插件的支持首先要安装两个插件 1.2. 安装xlsx cnpm install xlsx --save注&#xff1a;这块我用的cnpm&#xff0c;原生的是npm&#xff0c;因为镜像的问题安装了cnpm&#xff0c;至于怎么装网上一搜一大堆 1.3安…

用langchain搭配最新模型ollama打造属于自己的gpt

langchain 前段时间去玩了一下langchain,熟悉了一下大模型的基本概念&#xff0c;使用等。前段时间meta的ollama模型发布了3.0,感觉还是比较强大的&#xff0c;在了解过后&#xff0c;自己去用前后端代码&#xff0c;调用ollama模型搭建了一个本地的gpt应用。 核心逻辑 开始搭…

Vue 封装elementUI的el-popover

1.封装公共组件 <template><div class"confirm-popover disInlineBlock ml10"><el-popover placement"bottom" v-model"visible" :trigger"triggerType"><div class"confirm-popover-popover"><…

vue3中进度条上加高亮圆点

实现效果 小圆点基于进度条定位&#xff08;left&#xff09;。 实现代码 <template><!-- 这块代码实现的功能&#xff1a;progressData遍历的年份进度数组&#xff0c;展示每年完成的进度--><ul><li v-for"(item, index) in progressData" :k…

Unity VR 零基础开发之 Pico4 MR

一、新建Unity2021.3.37 3D工程 二、切换到Android安卓平台 1、点击Unity编辑器左上角的Flie后&#xff0c;选择Build Setting选项。 2、弹出弹窗后&#xff0c;点击Android选项&#xff0c;然后再点击Switch Platform按钮切换成安卓平台。 3、切换完成后Android选项后面会显示…

3. QGis二次开发项目实践一之解决“无法定位程序输入点“

前言 本章讲述实现本项目实现过程中遇到的QGis二次开发库版本和Qt以及其他动态库的版本匹配问题问题复现 本项目是要作为一个子模块集成到用户的项目中本项目最初的开发环境为QGis3.28+Qt5.15.2,而当时并未问清楚用户开发环境所以交付给用户之后,出现了类似下图的问题 出现该…

AI论文:一键生成论文的高效工具

说到这个问题&#xff0c;那真的得看你对“靠谱”的定义是怎样的啦&#xff1f; 众所周知&#xff0c;写论文是一项极其耗时间的事情&#xff0c;从开始的选题到文献资料搜索查阅&#xff0c;大纲整理等等一大堆的繁杂工作是极艰辛的。用AI写论文就不一样了&#xff0c;自动化…

php7.3安装phalcon扩展

php7安装3.4版本的phalcon扩展 适用于Centos6.x和Centos7.x系统&#xff0c;php使用7.1版本&#xff0c;wlnmp一键包已支持该扩展 phalcon扩展包地址&#xff1a;https://github.com/phalcon/cphalcon &#xff08;git clone 有可能连接不上&#xff09; 1、安装所需依赖&a…

Linux Kernel nf_tables 本地权限提升漏洞(CVE-2024-1086)

文章目录 前言声明一、netfilter介绍二、漏洞成因三、漏洞危害四、影响范围五、漏洞复现六、修复方案临时解决方案升级修复方案 前言 2024年1月&#xff0c;各Linux发行版官方发布漏洞公告&#xff0c;修复了一个 netfilter:nf_tables 模块中的释放后重用漏洞&#xff08;CVE-…

基于语音识别的智能电子病历(其他)CC、HPI、ROS案例分析

门诊使用基于语音识别的智能电子病历相对要少一些。很多系统的门诊系统包含了丰富的模版&#xff0c;美国退伍军人医院在非常早的时候&#xff08;2010年之前&#xff09;就通过使用大量的模版来进行处理门诊的数据录入。下面分析中的截图也是来自一些美国退伍军人医院的系统。…

网络高频攻击手段与基本防护措施总结

主要攻击手段 一、云平台攻击 云平台攻击是指针对云服务器的恶意行为&#xff0c;旨在获取非法访问权限、窃取敏感数据或者破坏服务器的正常运行。云平台攻击的形式多样&#xff0c;以下是对云平台攻击的一些主要类型和特点的详细分析&#xff1a; 攻击类型&#xff1a; 凭据…