kafka复习:(26)通过RecordHeaders和RecordHeader来实现TTL功能

一、定义生产者,在消息中加入RecordHeaders

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaTest26 {public static void main(String[] args) {Properties properties= new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);//大概率被消费者拦截器任务超时而丢弃RecordHeaders recordHeaders1 = new RecordHeaders();recordHeaders1.add("ttl", BytesUtils.longToBytes(1));RecordHeaders recordHeaders2 = new RecordHeaders();recordHeaders2.add("ttl", BytesUtils.longToBytes(30));RecordHeaders recordHeaders3 = new RecordHeaders();recordHeaders3.add("ttl", BytesUtils.longToBytes(60));ProducerRecord<String,String> producerRecord1 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders1);ProducerRecord<String,String> producerRecord2 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders2);ProducerRecord<String,String> producerRecord3 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders3);Future<RecordMetadata> future = kafkaProducer.send(producerRecord1);Future<RecordMetadata> future2 = kafkaProducer.send(producerRecord2);Future<RecordMetadata> future3 = kafkaProducer.send(producerRecord3);try {future.get();future2.get();future3.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("ok");kafkaProducer.close();}
}

二、定义消费者拦截器:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class TtlConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();for (TopicPartition tp : records.partitions()) {List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : tpRecords) {long ttl = -1;for (Header header : record.headers()) {if (header.key().equals("ttl")){ttl = BytesUtils.bytesToLong(header.value());}}// 超时???if (ttl > 0 && (now - record.timestamp() < ttl * 1000)){newTpRecords.add(record);} else {newTpRecords.add(record);}if (!newTpRecords.isEmpty()){newRecords.put(tp, newTpRecords);}}}return new ConsumerRecords<>(newRecords);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

三、定义消费者,配置上述拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest27 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TtlConsumerInterceptor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="ttl";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【力扣】304. 二维区域和检索 - 矩阵不可变 <二维前缀和>

目录 【力扣】304. 二维区域和检索 - 矩阵不可变二维前缀和理论初始化计算面积 题解 【力扣】304. 二维区域和检索 - 矩阵不可变 给定一个二维矩阵 matrix&#xff0c;以下类型的多个请求&#xff1a; 计算其子矩形范围内元素的总和&#xff0c;该子矩阵的 左上角 为 (row1, …

界面控件DevExtreme(v23.2)下半年发展路线图

在这篇文章中&#xff0c;我们将介绍DevExtreme在v23.2中发布的一些主要特性&#xff0c;这些特性既适用于DevExtreme JavaScript (Angular、React、Vue、jQuery)&#xff0c;也适用于基于DevExtreme的ASP. NET MVC/Core控件。 DevExtreme包含全面的高性能和响应式UI小部件集合…

netty(三):NIO——多线程优化

NIO多线程优化 使用Boss线程来处理accepct事件使用Worker线程来处理读写事件&#xff0c;可以创建多个worker线程 package com.review;import lombok.extern.slf4j.Slf4j;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.*; impor…

elementui的el-dialog组件与el-tabs同时用导致浏览器卡死的原因解决

文章目录 在el-dialog弹出框中&#xff0c;如果使用el-tabs&#xff0c;点击弹框的关闭按钮时&#xff0c;会导致弹出框无法关闭&#xff0c;且浏览器卡死。 环境是&#xff1a; vue :2.6.10 elementui:2.15.3解决方式&#xff1a; 在el-talbs的面板中不放任何内容&#xff…

需求分析入门

认识管理软件 什么是管理软件 管理软件就是用来辅助企业进行管理的软件&#xff0c;既包括对企业“人、财、物”相关的资产信息的管理&#xff0c;也包括对企业“供、产、销”相关的业务活动信息的管理。管理软件的重点在于管理信息的收集、流转&#xff0c;资源的共享、集成…

UML基础

统一建模语言&#xff08;UML是 Unified Modeling Language的缩写, 是用来对软件系统进行可视化建模的一种语言。UML为面向对象开发系统的产品 进行说明、可视化、和编制文档的一种标准语言。 共有9种图 UML中的图其实不止九种 (相同的图还可能会有不同的名称), 这里的九种图是…

Ubuntu 20.04 网卡命名规则

Ubuntu 系统中网卡的命名规则是&#xff1a;Consistent Network Device Naming&#xff08;一致网络设备命名&#xff09;规范。这个规范的原理是根据固件、拓扑和位置信息分配固定名称。其中&#xff0c;设备类型 en 代表 Ethernet (以太网)&#xff0c;wl 代表 WLAN&#xff…

Linux(基础IO)

Linux&#xff08;基础IO&#xff09; 前言C语言文件IO什么叫当前路径stdin/stdout/stderr 系统文件IOopenclosewriteread 文件描述符文件描述符的分配规则 重定向输出重定向原理追加重定向原理输入重定向原理dup2添加重定向功能到minishell 缓冲区模拟实现一个缓冲区 理解文件…

云原生周刊:Linkerd 发布 v2.14 | 2023.9.4

开源项目推荐 Layerform Layerform 是一个 Terraform 包装器&#xff0c;可帮助工程师使用纯 Terraform 文件构建可重用的基础设施。 为了实现重用&#xff0c;Layerform 引入了层的概念。每层都包含一些基础设施&#xff0c;并且可以堆叠在另一层之上。 除了更易于使用之外…

RK3568-android11-适配ov13850摄像头

硬件连接 主要分为两部分: mipi接口:传输摄像头数据 i2c接口:配置摄像头和对焦马达芯片寄存器相关驱动 |-- arch/arm64/boot/dts/rockchip DTS配置文件 |-- drivers/phy/rockchip/|-- phy-rockchip-mipi-rx.c mipi dphy 驱动 |-- drivers/media||-- platform/rockchip/isp1…

uniapp 集成蓝牙打印功能(个人测试佳博打印机)

uniapp 集成蓝牙打印功能&#xff08;个人测试京博打印机&#xff09; uniapp 集成蓝牙打印功能集成佳博内置的接口 uniapp 集成蓝牙打印功能 大家好今天分析的是uniapp 集成蓝牙打印功能&#xff0c;个人开发是app,应该是支持H5(没试过) 集成佳博内置的接口 下载dome地址&…

2023高教社杯数学建模A题B题C题D题E题思路模型 国赛建模思路分享

文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…

怎么把pdf压缩的小一点?

怎么把pdf压缩的小一点&#xff1f;在我们日常的学习和工作中&#xff0c;PDF文件是一个非常常见和有用的文件格式&#xff0c;并且受到很多小伙伴的喜欢。有时候&#xff0c;一些PDF文件可能会很大&#xff0c;造成pdf文件较大的原因其实很明确&#xff0c;主要是因为pdf文件中…

排序算法的稳定性

稳定性&#xff1a;对于一个数&#xff0c;经过多次排序&#xff0c;保留一个数之间的相对次序 在基础类型数据上&#xff0c;稳定性用处不大 在非基础类型上&#xff0c;可以做到对于相同元素来说&#xff0c;排完序相同元素之间的相对次序不变 归并排序在merge的过程中先拷贝…

数字孪生城市总体架构进一步迭代更新

经过五年来发展&#xff0c;数字孪生城市基本形成“三横四纵”的总体架构&#xff0c;“三横”为新型基础设施、智能运行中枢、孪生应用体系&#xff0c;“四纵”为组织保障体系、标准规范体系、网络安全防线、运营保障体系&#xff0c;具体如下。 数字孪生城市总体架构-来源&a…

机器人编程怎么入门?

机器人已经在我们中间存在了二三十年。如今&#xff0c;机器人在我们的文化中比以往任何时候都更加根深蒂固。大多数机器人机器用于各种装配线&#xff0c;或在世界各地的矿山或工业设施中执行密集的物理操作。 还有一些家用机器人&#xff0c;工程师正在对机器人进行编程&…

尚硅谷大数据项目《在线教育之离线数仓》笔记006

视频地址&#xff1a;尚硅谷大数据项目《在线教育之离线数仓》_哔哩哔哩_bilibili 目录 第11章 数仓开发之ADS层 P087 P088 P089 P090 P091 P092 P093 P094 P095 P096 P097 P098 P099 P100 P101 P102 P103 P104 P105 P106 P107 P108 P109 P110 P111 …

《Kubernetes部署篇:Ubuntu20.04基于二进制安装安装kubeadm、kubelet和kubectl》

一、背景 由于客户网络处于专网环境下&#xff0c; 使用kubeadm工具安装K8S集群&#xff0c;由于无法连通互联网&#xff0c;所有无法使用apt工具安装kubeadm、kubelet、kubectl&#xff0c;当然你也可以使用apt-get工具在一台能够连通互联网环境的服务器上下载kubeadm、kubele…

微软牵手Linux:Ubuntu“系统”上架win10应用商店啦

导读继SUSE Linux登陆之后&#xff0c;Ubuntu今天正式以UWP应用的身份上架Win10应用商店。Windows Insider用户升级到Win10秋季创意者更新预览版Build 16190及以上就可以下载和安装Ubuntu系统应用。一旦下载和安装完Ubuntu应用后&#xff0c;它将开始在你的Windows10 PC上安装U…

[羊城杯 2023] web

文章目录 D0nt pl4y g4m3!!! D0n’t pl4y g4m3!!! 打开题目&#xff0c;可以判断这里为php Development Server 启动的服务 查询得知&#xff0c;存在 PHP<7.4.21 Development Server源码泄露漏洞(参考文章) 抓包&#xff0c;构造payload 得到源码 class Pro{private $ex…