kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/23417.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx动静分离和反向代理

一、动静分离 动静分离指的是将动态内容和静态内容分开处理。动态内容通常由后端应用程序生成&#xff0c;例如PHP、Python或Node.js&#xff0c;静态内容则包括图片、CSS、JavaScript等文件。 例子&#xff1a; #代理服务器一 server{listen 80;server_name www.dj.com;r…

夏季高温来袭|危化品如何安全储存?

《危险化学品安全管理条例》第三条 本条例所称危险化学品&#xff0c;是指具有毒害、腐蚀、爆炸、燃烧、助燃等性质&#xff0c;对人体、设施、环境具有危害的剧毒化学品和其他化学品。 随着夏天高温的来袭&#xff0c;炎热的天气对危化品储存威胁巨大&#xff0c;危化品事故也…

Mybatis-Plus的一些理解

MybatisPlus在Mybatis的基础上进行了增强&#xff0c;进一步简化了SQL的编写。对于普通的单表的CRUD&#xff0c;MyBatisPlus提过了相关接口进行快速操作。比如&#xff1a; insert(T entity): 插入一个实体&#xff0c;null的属性也会保存&#xff0c;不会使用数据库默认值。…

详解Selenium 强制等待、隐式等待和显式等待

在Selenium中使用等待机制&#xff1a;强制等待、隐式等待和显式等待 在自动化测试中&#xff0c;等待是一个非常重要的部分&#xff0c;因为它可以帮助确保测试脚本在执行操作前&#xff0c;页面元素已经加载完成或者变为可用。Selenium WebDriver 提供了多种等待机制&#x…

【C++课程学习】:C++入门(输入输出,缺省参数)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 &#x1f369;1.关于C输入输出&#xff1a; &#x1f369;2.缺省参数函数&#xff1a; 缺省参数的概…

聊聊大模型微调训练全流程的思考

前言 参考现有的中文医疗模型&#xff1a;MedicalGPT、CareGPT等领域模型的训练流程&#xff0c;结合ChatGPT的训练流程&#xff0c;总结如下&#xff1a; 在预训练阶段&#xff0c;模型会从大量无标注文本数据集中学习领域/通用知识&#xff1b;其次使用{有监督微调}(SFT)优化…

python如何快速的判断一个key在json的第几层呢,并修改其value值

python如何快速的判断一个key在json的第几层呢&#xff0c;并修改其value值 def find_and_modify_key(json_obj, target_key, new_value, current_level1):# 检查当前层是否包含目标keyif target_key in json_obj:print(f"找到 {target_key} 在第 {current_level} 层。&q…

在 JavaScript 中,除了使用可选链操作符,还有哪些方法可以安全地访问对象的深层属性?

在JavaScript中&#xff0c;除了使用可选链操作符&#xff08;?.&#xff09;&#xff0c;还有几种方法可以安全地访问对象的深层属性&#xff0c;避免因访问不存在的属性而抛出错误。以下是一些常用的方法&#xff1a; 1. 条件&#xff08;三元&#xff09;运算符&#xff1…

以太网基础 -- LLDP使用案例

LLDP使用技术报告 背景 链路层发现协议&#xff08;Link Layer Discovery Protocol&#xff0c;LLDP&#xff09;是一种网络协议&#xff0c;主要用于在以太网网络中发现相邻设备并与其交换信息。LLDP是一种开放的标准&#xff0c;由IEEE 802.1AB定义&#xff0c;能够跨多个厂…

dubbo服务引用

spring刷新容器的时候&#xff0c;ReferrenceBean只有被调用的情况下才会在createBean回调afterPropertiesSet的时候引入服务。 ReferrenceBean实现了FacotryBean的接口&#xff0c;当对任意服务Interface进行自动注入或者getBean获取&#xff0c;会触发getObject过程 本地服…

TensorBoard在pytorch训练过程中如何使用,及数据读取问题解决方法

TensorBoard 模块导入日志记录文件的创建训练中如何写入数据如何提取保存的数据调用TensorBoard面板可能会遇到的问题 模块导入 首先从torch中导入tensorboard的SummaryWriter日志记录模块 from torch.utils.tensorboard import SummaryWriter然后导入要用到的os库&#xff0…

通过一个例子,说明Python的责任链设计模式有什么优缺点

责任链设计模式&#xff08;Chain of Responsibility Pattern&#xff09;是一种行为设计模式&#xff0c;它允许将一个请求沿着处理者链进行传递&#xff0c;直到有一个处理者处理它为止。在Python中&#xff0c;这种模式可以通过多种方式实现&#xff0c;通常涉及到一系列对象…

方案设计|汽车轮胎数显胎压计方案

一、引言 数显轮胎胎压计是一个专门测量车辆轮胎气压的工具&#xff0c;它具有高精度测量的功能&#xff0c;能够帮助快速准确获取轮胎气压正确数值&#xff0c;保证轮胎使用安全。本文将对数显轮胎胎压计的方案技术进行分析&#xff0c;包括其基本原理、硬件构成、软件设计等方…

架构学习:什么是业务架构图?如何画业务架构图?

6.1~6.18&#xff0c;艾威618年中大促&#xff0c;钜惠来袭&#xff01;想报课但还没下手的小伙伴&#xff0c;都可以行动起来啦&#xff01;活动规则还是一如既往的简单、粗暴——直接立减、返现、抽奖以及送礼品&#xff01;了解活动详情&#xff0c;请点击这里》》 业务架构…

ActiViz中的vtkVolumeMapper

文章目录 一、简介二、工作原理三、核心功能四、使用步骤五、配置与优化六、案例与应用七、扩展与集成八、结论一、简介 vtkVolumeMapper是 ActiViz库中专为处理和渲染三维体积数据设计的核心组件。在诸如医学成像、地质科学、气象模型及工程仿真等多个领域,它扮演着至关重要…

【源码】SpringBoot事务注册原理

前言 对于数据库的操作&#xff0c;可能存在脏读、不可重复读、幻读等问题&#xff0c;从而引入了事务的概念。 事务 1.1 事务的定义 事务是指在数据库管理系统中&#xff0c;一系列紧密相关的操作序列&#xff0c;这些操作作为一个单一的工作单元执行。事务的特点是要么全…

实验9 浮动静态路由配置

--名称-- 一、 原理描述二、 实验目的三、 实验内容四、 实验配置五、 实验步骤 一、 原理描述 浮动静态路由也是一种特殊的静态路由&#xff0c;主要考虑链路冗余。浮动静态路由通过配置一条比主路由优先级低的静态路由&#xff0c;用于保证在主路由失效的情况下&#xff0c;…

代码随想录 day27|day28|day29

回溯2 切割问题&#xff1a;是在每个节点判断是否是要剪枝收割元素。 startidx 是切割起点&#xff0c;i是本次切割终点 分割回文串 复原ip地址 非递减子序列 都是在树的节点依照题意判断&#xff0c;之后决定是否剪枝。 也就是都有if判断来剪枝 。 下面是非递减子序列。 下…

Python | 虚拟环境的增删改查

mkvirtualenv创建虚拟环境 mkvirtualenv是用于在Pyhon中创建虚拟环境的命令。它通过使用vitualenv库来创建一个隔离的Python环境&#xff0c;以便您可以安装特定版本的Python包&#xff0c;而不会影响全局Python环境。 使用方法: 安装virtualenv&#xff1a;pip install vir…

前端将xlsx转成json

第一种方式&#xff0c;用js方式 1.1先安装插件 万事都离不开插件的支持首先要安装两个插件 1.2. 安装xlsx cnpm install xlsx --save注&#xff1a;这块我用的cnpm&#xff0c;原生的是npm&#xff0c;因为镜像的问题安装了cnpm&#xff0c;至于怎么装网上一搜一大堆 1.3安…