kafka:消费者从指定时间的偏移开始消费(二)

我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。
但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取消息时,因为得不到消费者的消费偏移,最后的结果,就是从0偏移开始拉取所有消息。并不能真正实现忽略上线之前所有消息的目的。
所以我又优化了方案。基本的原理就是使用KafkaConsumer.offsetsForTimes方法获取消费者的所有主题分区的指定时间的偏移,并将这个偏移作为消费开始的偏移(KafkaConsumer.seek方法) 。

	@Testpublic void test3SeekToTime() {// 配置Kafka消费者的属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my_consumer_group");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());// 创建Kafka消费者实例try(Consumer<String, String> consumer = new KafkaConsumer<>(props)){			boolean seek = false;/** * 循环开始的时间,* 忽略该时间之前的消息*/long startMills = System.currentTimeMillis();while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(4000));if(!seek) {if(!records.isEmpty()) {/** * 获取第一批消息时更新消息偏移到循环开始的时间*/consumer.offsetsForTimes(Maps.asMap(consumer.assignment(),t->startMills)).forEach((k,v)->{if(null != v) {System.out.println("seek %s to %s",k,v.offset());consumer.seek(k,v.offset());}});seek = true;}/** 跳过第一批获取到的消息,继续循环 */continue;}records.forEach(record -> {String value = record.value();System.out.println("Received message: " + value);});}catch (Exception e) {e.printStackTrace();}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python的输入:探索不同的输入方法和技巧

Python的输入&#xff1a;探索不同的输入方法和技巧 引言 1.1 Python的输入概述 在编程中&#xff0c;输入是一个非常重要的概念。它允许我们从用户、文件或其他程序中获取数据&#xff0c;以便进行处理、计算或展示。Python提供了多种方法来获取输入&#xff0c;每种方法都…

SpringBoot中的RestTemplate使用笔记

SpringBoot中的RestTemplate使用笔记 为了方便使用&#xff0c;这里我封装成一个工具类来静态调用RestTemplate以下代码是基于SpringBoot2.4.2版本写的案例 需要配置的application.yml如下 server:port: 7024servlet:context-path: /demosession:timeout: 30m #默认会话过期…

redis分布式锁

Redis 作者继续论述&#xff0c;如果对方认为&#xff0c;发生网络延迟、进程 GC 是在步骤 3 之后&#xff0c;也就是客户端确认拿到了锁&#xff0c;去操作共享资源的途中发生了问题&#xff0c;导致锁失效&#xff0c;那这不止是 Redlock 的问题&#xff0c;任何其它锁服务例…

数据库面试题

Mysql篇 &#xff08;1&#xff09;请你解释下mysql主从同步中的&#xff0c;全同步&#xff0c;异步&#xff0c;以及半同步的三种模式概念? 此题是XX想面试题。 MySQL默认的复制即是异步的&#xff1a; 主库在执行完客户端提交的事务后会立即将结果返给客户端&#xff0c…

Flowable-任务-脚本任务

定义 脚本任务&#xff08;Script Task&#xff09;是一种自动执行的活动。当流程执行到达脚本任务时&#xff0c;会执行相应的 脚本&#xff0c;完毕后继续执行后继路线。脚本任务无须人为参与&#xff0c;可以通过定义脚本实现自定义的业务逻辑。 图形标记 脚本任务显示为…

数据结构基础:3.单链表的实现。

单链表的介绍和实现 一.基本概念1.基本结构2.结构体节点的定义&#xff1a; 二.功能接口的实现0.第一个节点&#xff1a;plist1打印链表2创建一个节点3.头插4.头删5.尾插6.尾删7.查找8.在pos之前插入x9.在pos之后插入x10.删除pos位置11.删除pos的后一个位置12.链表释放 三.整体…

UE4/5C++多线程插件制作(二十、源码)

目录 头文件 MultiThreadPlugins.uplugin MultiThreadPlugins.Build.cs MultiThreadPlugins.h MTPPlatform.h MTPManage.h RTPAgendy.h MTPThreadTaskManage.h

C语言每天一练----输出水仙花数

题目&#xff1a;请输出所有的"水仙花数" 题解&#xff1a;所谓"水仙花数"是指一个3位数,其各位数字立方和等于该数本身。 例如, 153是水仙花数, 因为153 1 * 1 * 1 5 * 5 * 5 3 * 3 * 3" #define _CRT_SECURE_NO_WARNINGS 1#include <stdio.h&g…

【自动化运维】Ansible常见模块的运用

目录 一、Ansible简介二、Ansible安装部署2.1环境准备 三、ansible 命令行模块3.1&#xff0e;command 模块3.2&#xff0e;shell 模块3.3&#xff0e;cron 模块3.4&#xff0e;user 模块3.5&#xff0e;group 模块3.6&#xff0e;copy 模块3.7&#xff0e;file 模块8&#xff…

【雕爷学编程】MicroPython动手做(10)——零基础学MaixPy之神经网络KPU

早上百度搜“神经网络KPU”&#xff0c;查到与非网的一篇文章《一文读懂APU/BPU/CPU/DPU/EPU/FPU/GPU等处理器》&#xff0c;介绍各种处理器非常详细&#xff0c;关于“KPU”的内容如下&#xff1a; KPU Knowledge Processing Unit。 嘉楠耘智&#xff08;canaan&#xff09;号…

k8s证书更新,kubeadm安装的K8S证书过期后无法使用后证书更新方法

k8s证书更新 1. 查看证书过期时间 #通过文件查看证书过期时间 for item in find /etc/kubernetes/pki -maxdepth 2 -name "*.crt";do openssl x509 -in $item -text -noout| grep Not;echo $item;done #通过命令查看证书过期时间 kubeadm certs check-expirationk8…

找不到mfc140u.dll怎么解决

第一&#xff1a;mfc140u.dll有什么用途&#xff1f; mfc140u.dll是Windows操作系统中的一个动态链接库文件&#xff0c;它是Microsoft Foundation Class (MFC)库的一部分。MFC是 C中的一个框架&#xff0c;用于构建Windows应用程序的用户界面和功能。mfc140u.dll包含了MFC库中…

杂谈项——关于我在bw上的见闻,以及个人对二次元游戏行业方面的前瞻

君兮_的个人主页 勤时当勉励 岁月不待人 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;今天为大家带来一点不一样的&#xff0c;首先先光速叠一下甲&#xff1a; 在此说明博主并不是一个什么都知道的大佬&#xff0c;只是一个普通的老二次元以及期望以后能…

HCIP重发布实验

目录 实验要求&#xff1a; 步骤一&#xff1a;拓扑设计IP地址规划 拓扑设计 R1 R2 R3 R4 发布路由 R1 R2 R3 R4 双向重发布 在R2和R4 上进行 R2 R4 检查R1 修改开销值选路 择优选择去4.0网段的路径 测试&#xff1a;​编辑 择优选择去32网段的路径 测试&…

【uniapp 获取缓存及清除缓存】

小程序及H5 获取缓存&#xff1a; 使用uniapp中的wx.getStorageInfoSync()方法可以获取当前小程序或H5应用的本地缓存信息&#xff0c;如下所示&#xff1a; let storageInfo uni.getStorageInfoSync() console.log(storageInfo)其中&#xff0c;storageInfo是一个对象&…

短视频矩阵源码/系统搭建/源码

一、短视频矩阵系统开发需要具备以下能力 短视频技术能力&#xff1a;开发人员应具备短视频相关技术能力&#xff0c;如视频编解码、视频流媒体传输等。 大数据存储和处理能力&#xff1a;短视频矩阵系统需要处理大量的视频数据&#xff0c;因此需要具备大数据存储和处理的能力…

JavaScript快速入门:ComPDFKit PDF SDK 快速构建 Web端 PDF阅读器

JavaScript快速入门&#xff1a;ComPDFKit PDF SDK 快速构建 Web端 PDF阅读器 在当今丰富的网络环境中&#xff0c;处理 PDF 文档已成为企业和开发人员的必需品。ComPDFKit 是一款支持 Web 平台并且功能强大的 PDF SDK&#xff0c;开发人员可以利用它创建 PDF 查看器和编辑器&…

初探webAssembly | 京东物流技术团队

1 WebAssembly是什么&#xff1f; 一种运行在现代网络浏览器中的新型代码&#xff0c;并且提供新的性能特性和效果 W3C WebAssembly Community Group开发的一项网络标准&#xff0c;对于浏览器而言&#xff0c;WebAssembly 提供了一条途径&#xff0c;让各种语言编写的代码以接…

【Visual Studio】VS调用tensorflow C++API的配置(无需编译)

windows利用vs2015调用tensorflow c api 1. 首先下载并安装visual studio Visual Studio 2015 安装教程&#xff08;附安装包&#xff09;&#xff0c;按照博客中顺序来就可以 如果在安装过程中提示安装包丢失或损坏&#xff0c;参考VS2015安装过程中安装包丢失或损坏解决办…

策略模式的实现与应用:掌握灵活算法切换的技巧

文章目录 常用的设计模式有以下几种&#xff1a;一.创建型模式&#xff08;Creational Patterns&#xff09;&#xff1a;二.结构型模式&#xff08;Structural Patterns&#xff09;&#xff1a;三.行为型模式&#xff08;Behavioral Patterns&#xff09;&#xff1a;四.并发…