kafka复习:(17)seekToBeginning的用法

从分区的开始进行消费,因为kafka会定期清理历史数据,所以分区开始的位移不一定为0。seekToBeginning只是从目前保留的数据中最小的offset进行消费

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
/*
从分区开头进行消费; seekToBeginning)*/public class KafkaTest14 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup12");//properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());myConsumer.subscribe(Arrays.asList("student"));Set<TopicPartition> topicPartitionSet = new HashSet<>();while(topicPartitionSet.size() == 0){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));topicPartitionSet = myConsumer.assignment();}myConsumer.seekToBeginning(topicPartitionSet);while(true){ConsumerRecords<String, String> consumerRecords = myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println(record.offset());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/52735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式计算框架:Spark、Dask、Ray

目录 什么是分布式计算 分布式计算哪家强&#xff1a;Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 什么是分布式计算 分布式计算是一种计算方法&#xff0c;和集中式计算是相对的。 随着计算技术的发展&#xff0c;有些应用需要非常巨大的计算能力才…

亿赛通电子文档安全管理系统 RCE漏洞复现(QVD-2023-19262)

0x01 产品简介 亿赛通电子文档安全管理系统&#xff08;简称&#xff1a;CDG&#xff09;是一款电子文档安全加密软件&#xff0c;该系统利用驱动层透明加密技术&#xff0c;通过对电子文档的加密保护&#xff0c;防止内部员工泄密和外部人员非法窃取企业核心重要数据资产&…

VIT 和Swin Transformer

VIT&#xff1a;https://blog.csdn.net/qq_37541097/article/details/118242600 Swin Transform&#xff1a;https://blog.csdn.net/qq_37541097/article/details/121119988 一、VIT 模型由三个模块组成&#xff1a; Linear Projection of Flattened Patches(Embedding层) Tran…

C语言基础之——数组

前言&#xff1a;本篇文章&#xff0c;我们将对一维数组&#xff0c;和二维数组进行展开式的讲解&#xff0c;并进行实际应用。 目录 一.一维数组 1.一维数组的创建和初始化 &#xff08;1&#xff09;数组的创建 &#xff08;2&#xff09;数组的初始化 2.一维数组的使用…

二叉树中的最大路径和-递归

路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节点连接&#xff0c;达到任意节点的序列。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…

【python】tkinter使用多进程打包成exe后multiprocessing无法关闭对应进程

这是由于multiprocessing模块在Windows操作系统下使用fork方法创建子进程时会导致打包成exe后无法正常运行的问题。 可以尝试使用freeze_support函数来解决这个问题。freeze_support函数是在Windows操作系统下用于支持multiprocessing模块的函数。 下面是一个示例代码&#x…

C# 案例题

1. // # hello world using System; namespace HelloWorldApplication {class HelloWorld{static void Main(string[] args) {/*my first C# program*/Console.WriteLine("HelloWorld C#");Console.ReadKey();}} } 2. // C# 计算矩形的面积 /*计…

AI智能语音机器人的基本业务流程

先画个图&#xff0c;了解下AI语音机器人的基本业务流程。 上图是一个AI语音机器人的业务流程&#xff0c;简单来说就是首先要配置话术&#xff0c;就是告诉机器人在遇到问题该怎么回答&#xff0c;这个不同公司不同行业的差别比较大&#xff0c;所以一般每个客户都会配置其个性…

华为OD机试 - 最佳植树距离 - 二分查找(Java 2023 B卷 100分)

目录 一、题目描述二、输入描述三、输出描述四、备注说明五、二分查找六、解题思路七、Java算法源码八、效果展示1、输入2、输出3、说明 一、题目描述 按照环保公司要求&#xff0c;小明需要在沙化严重的地区进行植树防沙工作&#xff0c;初步目标是种植一条直线的树带。 由于…

java ssl加密发送邮件

通过25端口发送邮件不安全&#xff0c;改为ssl加密方式发送邮件&#xff0c;比较常见的2中实现类发送邮件如下所示。 1、JavaMailSenderImpl 类 使用该实现类发送邮件&#xff0c;ssl加密使用端口号为465&#xff0c;借助Properties类设置ssl的各种配置。 SysUserEntity user…

微信小程序——van-field中的left-icon属性自定义

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

vue 简单实验 v-for 循环

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"list-rendering"><ol><li v-for"todo in todos">{{ todo.text }}</li></ol> </div> &…

AUTOSAR规范与ECU软件开发(实践篇)6.2 ETAS RTA系列工具入门

目录 1、 RTA系列工具安装方法 (1) ETAS RTA-RTE的安装方法 (2) ETAS RTA-BSW的安装方法

软件架构阐述

软件架构主要从以下几个方面进行阐述: 1. 架构模式 常见的软件架构模式有: 1. MVC架构 MVC全称Model-View-Controller,是一种分离视图和业务逻辑的软件设计典范,通过解耦来提高灵活性和复用性。 2. SOA架构 面向服务的架构(SOA)通过服务接口进行松耦合的组件编排,可以灵活可扩…

Redis的常用数据类型详解

Redis是一个开源的、基于内存的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息代理。Redis支持多种数据类型&#xff0c;包括字符串、列表、集合、有序集合、散列等。理解这些数据类型的特性和使用方式&#xff0c;对于充分利用Redis的能力至关重要。以下是对Redis…

【LeetCode】125. 验证回文串 - 双指针

这里写自定义目录标题 2023-8-24 09:31:12 125. 验证回文串 2023-8-24 09:31:12 最关键的是 注意 题目中的 “字母和数字都属于字母数字字符。” 使用ascii码进行判断就行了 class Solution {public boolean isPalindrome(String s) {int p 0, q s.length() - 1;while (…

自然语言处理从入门到应用——LangChain:链(Chains)-[通用功能:自定义Chain和Chain的异步API]

分类目录&#xff1a;《自然语言处理从入门到应用》总目录 创建自定义Chain 要实现自己的自定义链式连接&#xff0c;我们可以子类化Chain并实现以下方法&#xff1a; from __future__ import annotations from typing import Any, Dict, List, Optional from pydantic impor…

Jenkins自动化部署Vue项目

1、新建item&#xff0c;选择 Freestyle project 2、源码管理选择git&#xff0c;输入git仓库地址和授权账号&#xff0c;并指明要部署的分支 3、构建选择 Execute shell&#xff0c;输入vue项目打包命令 命令示例&#xff1a; source /etc/profile node -v npm config set re…

【stable-diffusion使用扩展+插件和模型资源(上】

文章目录 前言一、插件推荐1.qrcode-monster2.sd-webui-openpose-editor3.sd-webui-depth-lib4.roop&#xff08;换脸插件&#xff09;5.sd-webui-qrcode-toolkit&#xff08;艺术二维码&#xff09;5.光源控制6.二次元转真人7.动态视频转场&#xff08;loopback-wave&#xff…

无涯教程-PHP - preg_replace()函数

preg_replace() - 语法 mixed preg_replace (mixed pattern, mixed replacement, mixed string [, int limit [, int &$count]] ); preg_replace()函数的操作与POSIX函数ereg_replace()相同&#xff0c;不同之处在于可以在模式和替换输入参数中使用正则表达式。 可选的输…