kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

kafka-spring实现对于topic的监听的开启、暂停、暂停后重新开始、停止

直接上代码

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** kafka整体配置类** @author Dean*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.admin.client-id}")private String adminClientId;@Beanpublic AdminClient adminClient() {Map<String, Object> configs = new HashMap<>(5);configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configs.put(AdminClientConfig.CLIENT_ID_CONFIG, adminClientId);return AdminClient.create(configs);}/*** 如果有多个消费组,需要定义多个不同的ConcurrentKafkaListenerContainerFactory** @return ConcurrentKafkaListenerContainerFactory*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);String groupId = "dmGroup";props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//是否自动提交ackprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//一次拉取最大数据量,默认值为500,如果拉取时不足配置的条数则有多少拉取多少props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();//是否批量这个设置好像只对配置了@KafkaListener的方法有用factory.setBatchListener(false);factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));//手动提交ackfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;/*** kafka中间件的逻辑封装类*/
@Slf4j
@Service
public class KafkaListenerManagement {private final Map<String, MessageListenerContainer> containers = new ConcurrentHashMap<>();/*** 如果有多个消费组,需要注入多个不同的ConcurrentKafkaListenerContainerFactory*/private final ConcurrentKafkaListenerContainerFactory<String, String> containerFactory;@Autowiredpublic KafkaListenerManagement(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {this.containerFactory = containerFactory;}/*** 开启Topic的监听** @param topic            topic* @param bizLogicConsumer 消息的业务逻辑处理*/public void startListening(String topic, BiConsumer<String, Acknowledgment> bizLogicConsumer) {//必须手动提交ACK,否则停止监听后重新监听可能导致拉取到重复的记录AcknowledgingMessageListener<String, String> messageListener =(message, acknowledgment) -> bizLogicConsumer.accept(message.value(), acknowledgment);MessageListenerContainer container = containerFactory.createContainer(topic);container.setupMessageListener(messageListener);container.start();containers.put(topic, container);}/*** 暂停监听** @param topic topic*/public void pauseListening(String topic) {MessageListenerContainer container = containers.get(topic);container.pause();}/*** 暂停后继续监听** @param topic topic*/public void resumeListening(String topic) {MessageListenerContainer container = containers.get(topic);container.resume();}/*** 停止监听** @param topic topic*/public void stopListening(String topic) {MessageListenerContainer container = containers.remove(topic);if (container != null) {container.stop();}}
}
/*** Kafka生产者** @author LiuChang*/
@Service
public class KafkaProducerManagement {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** 异步发送** @param topic   topic* @param message 消息* @return ListenableFuture*/public ListenableFuture<SendResult<String, String>> send(String topic, String message) {return kafkaTemplate.send(topic, message);}
}
import com.feiynn.kafka.management.KafkaListenerManagement;
import com.feiynn.kafka.management.KafkaProducerManagement;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** 业务逻辑类* 注意:业务逻辑类不建议直接调用kafka的API,都调用封装后的Kafka相关的Management类** @author Dean*/
@Slf4j
@Service
public class BizService {@Resourceprivate KafkaListenerManagement kafkaListenerManagement;@Resourceprivate KafkaProducerManagement kafkaProducerManagement;/*** 开启topic监听后的业务逻辑** @param topic topic*/public void startListening(String topic) {kafkaListenerManagement.startListening(topic, (data, acknowledgment) -> {//消息处理业务逻辑log.info("Received message value: [{}]", data);try {//降低消费速率,方便观察日志TimeUnit.MILLISECONDS.sleep(100L);} catch (InterruptedException e) {e.printStackTrace();}acknowledgment.acknowledge();});}/*** 停止topic监听** @param topic topic*/public void stopListening(String topic) {kafkaListenerManagement.stopListening(topic);}/*** 暂停监听** @param topic topic*/public void pauseListening(String topic) {kafkaListenerManagement.pauseListening(topic);}/*** 暂停后继续监听** @param topic topic*/public void resumeListening(String topic) {kafkaListenerManagement.resumeListening(topic);}/*** 发送消息** @param topic   topic* @param message 消息*/public void sendMsg(String topic, String message) {ListenableFuture<SendResult<String, String>> listenableFuture = kafkaProducerManagement.send(topic, message);//添加回调逻辑,异步获取发送结果listenableFuture.addCallback((sendResult) -> {//发送成功log.trace("Send [{}] success", message);}, (e) -> {//发送失败,可以执行降级策略,或者把消息写入日志后续进行统一处理log.error("Send [{}] failed", message, e);});}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = KafkaAdvancedApplication.class)
public class KafkaAdvancedTest {@Resourceprivate BizService bizService;/*** 测试topic监听的开启、暂停、暂停后重新开始、停止*/@Testpublic void startStopListening() throws InterruptedException {String topicDm = "dm0";//开启topic监听bizService.startListening(topicDm);TimeUnit.SECONDS.sleep(2);//消息前缀,用来区分是上一次发送的未消费完的消息还是本次发送的消息String msgPre = LocalTime.now().toString();log.info("msgPre=[{}]", msgPre);for (int i = 0; i < 2000; i++) {bizService.sendMsg(topicDm, "Msg_" + msgPre + "_" + i);}TimeUnit.SECONDS.sleep(5);log.info("pause listening begin");bizService.pauseListening(topicDm);log.info("pause listening success");//暂停监听成功后,消费者会把配置max.poll.records条数的消息消费完才会真正停止,因此停顿足够长的时间后观察消息消费的日志是否会暂停输出TimeUnit.SECONDS.sleep(20);log.info("resume listening");//暂停后重新开启消息监听bizService.resumeListening(topicDm);TimeUnit.SECONDS.sleep(20);//新一轮暂停与重启log.info("pause listening again");bizService.pauseListening(topicDm);TimeUnit.SECONDS.sleep(10);log.info("resume listening again");bizService.resumeListening(topicDm);//继续消费一段时间TimeUnit.SECONDS.sleep(10);//消费一段时间后停止监听log.info("stop listening");bizService.stopListening(topicDm);TimeUnit.SECONDS.sleep(20);//重新开启topic监听log.info("start listening again");bizService.startListening(topicDm);TimeUnit.SECONDS.sleep(120);}}

直接运行测试用例,通过观察日志,即可看出各种操作效果

遇到的问题

遇到停止监听topic后,从看到消费消息的日志观察,有时会一直打印,有时会打印一段时间就停止打印的问题,最终发现暂停监听方法调用成功后,消费者会把配置max.poll.records条数的消息消费完才会真正暂停或者停止。
另外如果不是手动提交ack,停止stop(不是暂停pause)订阅topic然后后重新开始订阅(start),可能会出现重复消费消息的问题,改成手动提交ack后问题不再出现。
还考虑到max.poll.interval.ms 最大拉取时间间隔是5分钟,尝试了暂停5分30秒看是否消费者会被因为rebalance,导致在resume重新监听无法成功,测试结果是没有问题,可以成功继续监听并消费消息。
代码已使用到生产环境。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/2415.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BitmapIndex Scan 与BitmapHeap Scan

BitmapIndex Scan 与BitmapHeap Scan BitmapIndex Scan 与Index Scan 很相似&#xff0c;都是基于索引的扫描&#xff0c;但是BitmapIndex Scan 节点每次执行返回的是一个位图而不是一个元组&#xff0c;其中位图中每位代表了一个扫描到的数据块。而BitmapHeap Scan一般会作为…

4月23日,每日信息差

第一、目前全国共确定工伤保险异地就医直接结算试点城市131个&#xff0c;开通上线工伤医疗、工伤康复、辅助器具配置协议机构共398家。工伤职工按规定完成备案&#xff0c;持社保卡或电子社保卡可以到试点城市的协议机构直接结算相关费用 第二、极兔快递仅用 4 年成为中国国内…

算法和数据结构4.23:

1.测试工程师算法总结&#xff1a; L1排序和查找算法&#xff1a; 复杂度分析 查找算法&#xff1a;顺序查找、二分查找、其他查找算法 排序算法&#xff1a;冒泡排序、快速排序、其他排序算法 L2数据结构-线性表&#xff1a;链表、栈、队列、堆 L3数据结构-非线性表&…

信创传输软件,如何进行国产化替代?

信创产业&#xff0c;即信息技术应用创新产业&#xff0c;它与“863 计划”“973 计划”“核高基” 一脉相承&#xff0c;是我国 IT 产业发展升级采取的长期计划。网络安全事件频发后&#xff0c;中国要确保 IT 相关设施的全部环节国产化&#xff0c;任何不能保证自主可控的环节…

服务器(AIX、Linux、UNIX)性能监视器工具【nmon】使用介绍

目录 ■nmon简介 1.安装 2.使用简介 3.使用&#xff08;具体使用的例子【CPU】【内存】&#xff09; 4.采集数据 5.查看log&#xff08;根据结果&#xff0c;生成报表&#xff09; 6.分析结果 ■nmon简介 nmon&#xff08;"Nigels performance Monitor"&…

终于有人说明白了session、cookie和token的区别

一、首先介绍一下名词&#xff1a;Session、cookie、token&#xff0c;如下&#xff1a; 1.Session会话&#xff1a;客户端A访问服务器&#xff0c;服务器存储A的数据value&#xff0c;把key返回给客户端A&#xff0c;客户端A下次带着key&#xff08;session ID&#xff09;来…

(一)输入验证(语法和语义)

你无法控制进入应用程序的内容&#xff0c;甚至无法控制从数据库进入的内容&#xff0c;因为这些数据之前可能已经注入了。日期&#xff0c;如1900年 不是一个有效的日期&#xff0c;应确保在预期的范围内。因此&#xff0c;任何来自你控制的领域之外的东西&#xff0c;比如来自…

一文浅谈FRTC8563时钟芯片

FRTC8563是NYFEA徕飞公司推出的一款实时时钟芯片&#xff0c;采用SOP-8封装形式。这种封装形式具有体积小、引脚间距小、便于集成等特点&#xff0c;使得FRTC8563能够方便地应用于各种电子设备中。 FRTC8563芯片基于32.768kHz的晶体振荡器工作&#xff0c;这种频率的晶体振荡器…

2024年特种作业操作证(登高架设作业)考试题库及答案

一、选择题 1.带电跨越架宜采用干燥的竹竿、杉杆搭设&#xff0c;严禁使用&#xff08;  &#xff09;等。 A.钢管 B.松木 C.柳木 答案:A 2.跨越架立杆间距为&#xff08;  &#xff09;m。 A.1.2 B.1.5 C.1.8 答案:B 3.跨越架杆件相交时&#xff0c;不得一扣…

生活无趣感的成因与破解之道

在日常生活中&#xff0c;我们不时会听到身边人感慨&#xff1a;“生活真是无趣。”这种感受仿佛超越了年龄、性别、职业界限&#xff0c;成为现代社会的一种普遍情绪。然而&#xff0c;生活本身蕴含着无数色彩与可能&#xff0c;为何在许多人眼中却显得如此平淡乏味&#xff1…

JavaSE——程序逻辑控制

1. 顺序结构 顺序结构 比较简单&#xff0c;按照代码书写的顺序一行一行执行。 例如&#xff1a; public static void main(String[] args) {System.out.println(111);System.out.println(222);System.out.println(333);} 运行结果如下&#xff1a; 如果调整代码的书写顺序 , …

(ICML-2021)从自然语言监督中学习可迁移的视觉模型

从自然语言监督中学习可迁移的视觉模型 Title&#xff1a;Learning Transferable Visual Models From Natural Language Supervision paper是OpenAI发表在ICML 21的工作 paper链接 Abstract SOTA计算机视觉系统经过训练可以预测一组固定的预定目标类别。这种受限的监督形式限制…

在 Kubernetes 1.24 中使用 Docker:配置与应用指南

在 Kubernetes 1.24 中使用 Docker&#xff1a;配置与应用指南 引言 随着 Kubernetes 社区对容器运行时接口&#xff08;CRI&#xff09;的标准化推进&#xff0c;Docker 原生支持在 Kubernetes 1.24 版本中被弃用。然而&#xff0c;许多开发者和组织仍希望继续使用 Docker。…

服务器基本故障和排查方法

前言 服务器运维工作中遇到的问题形形色色&#xff0c;无论何种故障&#xff0c;都需要结合具体情况&#xff0c;预防为主的思想&#xff0c;熟悉各种工具和技术手段&#xff0c;养成良好的日志分析习惯&#xff0c;同时建立完善的应急预案和备份恢复策略&#xff0c;才能有效…

工业设备管理平台

在这个数字化、智能化的新时代&#xff0c;工业设备管理平台正成为推动工业转型升级的重要力量。在众多平台中&#xff0c;HiWoo Cloud以其卓越的性能、稳定的服务和创新的理念&#xff0c;赢得了广大用户的青睐。今天&#xff0c;就让我们一起走进HiWoo Cloud的世界&#xff0…

WebSocket的原理、作用、常见注解和生命周期的简单介绍,附带SpringBoot示例

文章目录 WebSocket是什么WebSocket的原理WebSocket的作用全双工和半双工客户端【浏览器】API服务端 【Java】APIWebSocket的生命周期WebSocket的常见注解SpringBoot简单代码示例 WebSocket是什么 WebSocket是一种 通信协议 &#xff0c;它在 客户端和服务器之间建立了一个双向…

123.Mit6.S081-实验1-Xv6 and Unix utilities

今天我们来进行Mit6.S081实验一的内容。 实验任务 一、启动xv6(难度&#xff1a;Easy) 获取实验室的xv6源代码并切换到util分支。 $ git clone git://g.csail.mit.edu/xv6-labs-2020 Cloning into xv6-labs-2020... ... $ cd xv6-labs-2020 $ git checkout util Branch util …

QML语法计基础二

1.Item 所有可视化类型的基类 可以作为容器,里面包含各种可视化类 1.1 opacity不透明度 取值范围&#xff1a;0.0-1.0 父容器的不透明度会影响到子容器,如果要设置不透明度&#xff0c;只需要更改子容器的不透明度即可 1.2 visible可见与enable启用 visible 可视化控件的可见…

Go 堆内存分配源码解读

简要介绍 在Go的内存分配中存在几个关键结构&#xff0c;分别是page、mspan、mcache、mcentral、mheap&#xff0c;其中mheap中又包括heapArena&#xff0c;具体这些结构在内存分配中担任什么角色呢&#xff1f; 如下图&#xff0c;可以先看一下整体的结构&#xff1a; mcach…

LeetCode刷题合集

203.移除链表元素 定位到需要删除节点的上一个节点 cur&#xff0c;将其指向下下个节点。 class Solution:def removeElements(self, head: Optional[ListNode], val: int) -> Optional[ListNode]:dummy_head ListNode(val 0, next head)cur dummy_headwhile cur and cu…