Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {private String kafkaUrl = "localhost:9092";@Beanpublic ConsumerFactory<String, Message> messageConsumerFactory() {JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class, false);deserializer.setRemoveTypeHeaders(false);deserializer.addTrustedPackages("*");deserializer.setUseTypeMapperForKey(true);Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(messageConsumerFactory());return containerFactory;}
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class KafkaMessageListener {Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message) {logger.info("Message received : -> {}", message);}
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;@Component
public class KafkaListenerAutomation {private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();logger.info("{} Kafka Listener Started", listenerId);return true;}public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);return true;}
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StartOrStopListenerController {@AutowiredKafkaListenerAutomation kafkaListenerAutomation;@GetMapping("/start")public void start(@RequestParam("id") String listenerId) {kafkaListenerAutomation.startListener(listenerId);}@GetMapping("/stop")public void stop(@RequestParam("id") String listenerId) {kafkaListenerAutomation.stopListener(listenerId);}
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68274.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端常见的设计模式之【单例模式】

前端常见的设计模式&#xff1a; 单例模式观察者模式工厂模式适配器模式装饰器模式命令模式迭代器模式组合模式策略模式发布订阅模式 单例模式【创建型设计模式】&#xff1a; 单例模式是确保一个类只有一个实例&#xff0c;并提供一个全局访问点。这个模式非常适合那些需要…

C++|CRC校验总结

参考&#xff1a; Vector - CAPL - CRC算法介绍 开发工具 > CRC校验工具 文章目录 简介CRC-8CRC-16CRC-32 简介 循环冗余校验&#xff08;Cyclic Redundancy Check&#xff0c;简称CRC&#xff09;是一种数据校验算法&#xff0c;广泛用于检测数据传输或存储过程中的错误。…

# c语言:数组详解一

c语言&#xff1a;数组详解一 数组数组的概念引例&#xff1a;什么是数组数组的特征&#xff1a;下标&#xff08;索引&#xff09; 常用的数组按维度划分一维数组数组的定义&#xff1a;数组元素的访问数组的初始化**案例一、斐波拉契数列&#xff1a;****案例二、冒泡排序&am…

c#-Halcon入门教程——标定

Halcon代码 read_image (NinePointCalibration, D:/Desktop/halcon/ca74d-main/九点标定/NinePointCalibration.gif)rgb1_to_gray (NinePointCalibration, GrayImage)get_image_size (GrayImage, Width, Height) dev_display (GrayImage)* 获取当前显示的窗口句柄 dev_get_win…

语音识别的预训练模型

语音识别的预训练模型 语音识别模型 大致分为两类: 连接时序分类(Connectionist Temporal Classification, CTC):仅编码器(encoder-only)的模型,顶部带有线性分类(CTC)头序列到序列(Sequence-to-sequence, Seq2Seq):编码器-解码器(encoder-decoder)模型,编码器…

Kotlin 协程基础十 —— 协作、互斥锁与共享变量

Kotlin 协程基础系列&#xff1a; Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发&#xff08;一&#xff09; Kotlin 协程基础三 —— 结构化并发&#xff08;二&#xff09; Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程…

4种革新性AI Agent工作流设计模式全解析

导读:AI Agent是指能够在特定环境中自主执行任务的人工智能系统,不仅接收任务,还自主制定和执行工作计划,并在过程中不断自我评估和调整,类似于人类在创造性任务中的思考和修正过程。AI Agent的四种关键设计模式是实现高效执行复杂任务的基础,共同构成了AI Agent的能力框…

Docker启动达梦 rman恢复

目录标题 1. 主库备份2. Docker启动备库3. 备库修改属组4. 开始恢复5. 连接数据库配置归档 & Open6. 检查数据 关于达梦数据库&#xff08;DMDBMS&#xff09;的主库备份、Docker启动备库、恢复备份以及配置归档和打开数据库的详细步骤。 1. 主库备份 # 使用达梦数据库备…

WPS excel使用宏编辑器合并 Sheet工作表

使用excel自带的工具合并Sheet表&#xff0c;我们会发现需要开通WPS会员才能使用合并功能&#xff1b; 那么WPS excel如何使用宏编辑器进行合并 Sheet表呢&#xff1f; 1、首先我们要看excel后缀是 .xlsx 还是 .xls &#xff1b;如果是.xlsx 那么 我们需要修改为 .xls 注…

MySQL之DDL语言

目录 一、数据库的基本操作 1、创建数据库 语法&#xff1a; 示例&#xff1a; 2、修改数据库 语法&#xff1a; 示例&#xff1a; 3、删除数据库 语法&#xff1a; 示例&#xff1a; 4、查询数据库 语法&#xff1a; 5、使用数据库 语法&#xff1a; 二、数据表…

js多种循环方法(通过循环进行判断的相关方法)

for&#xff1a;正常循环&#xff08;同步的循环&#xff09; break、continue终止循环 for (let index 0; index < array.length; index) {const element array[index];} forEach&#xff1a;正常循环&#xff08;异步的循环&#xff09; 通过try异常抛出终止循环&am…

RAG技术:是将知识库的文档和问题共同输入到LLM中

RAG技术 RAG技术是将知识库的文档和问题共同输入到LLM中 RAG技术是先从知识库中检索出与问题相关的文档片段,然后将这些检索到的文档片段与问题一起输入到LLM中进行回答。具体过程如下: 文本分块 由于LLM的上下文窗口有限,需要将长文本资料分割成较小的块,以便LLM能够有…

现代 CPU 的高性能架构与并发安全问题

现代 CPU 的设计&#xff08;如多级缓存、指令重排&#xff09;为了提升性能&#xff0c;引入了许多优化机制&#xff0c;但这些机制可能导致并发场景下的安全性问题。并发安全性主要体现在三个方面&#xff1a;原子性、有序性 和 可见性。这些问题在底层通过 CAS&#xff08;C…

SpringMVC框架(二)

目录 三、请求参数绑定 四、常用注解 1、RequestParam注解 2、RequestBody注解 3、RequestHeader注解 4、CookieValue注解 5、PathVaribale注解 三、请求参数绑定 1、案例 jsp代码 <% page contentType"text/html;charsetUTF-8" language"java"…

【Python项目】个人密码本文档系统

【Python项目】个人密码本文档系统 技术简介&#xff1a;采用Python技术、Django、MYSQL数据库等实现。 系统简介&#xff1a;系统主要的功能有&#xff08;1&#xff09;新建密码本&#xff1a;用户可以创建新的密码本来记录自己的账户与密码&#xff1b; &#xff08;2&#…

mysql连接失败问题记录

mysql服务有时候在未正常关闭时&#xff0c;会导致在机器重启后导致连接不成功&#xff0c;这边只记录我遇到的情况及解决方案&#xff0c;主要是mysql的日志文件在复位异常关闭时造成文件损坏&#xff0c;然后下一次开机MySQL可能无法正确读取或写入这些文件&#xff0c;从而导…

《OpenCV》——模版匹配

文章目录 OpenCV——模版匹配简介模版匹配使用场景OpenCV 中模板匹配的函数参数 OpenCV——模版匹配实例导入所需库读取图片并处理图片对模版图片进行处理进行模版匹配显示模版匹配的结果注意事项 OpenCV——模版匹配简介 OpenCV 是一个非常强大的计算机视觉库&#xff0c;其中…

doc、pdf转markdown

国外的一个网站可以&#xff1a; Convert A File Word, PDF, JPG Online 这个网站免费的&#xff0c;算是非常厚道了&#xff0c;但是大文件上传多了之后会扛不住 国内的一个网站也不错&#xff1a; TextIn-AI智能文档处理-图像处理技术-大模型加速器-在线免费体验 https://…

整数对最小和,暴力存储所有数组,再放容器sort一下,accumulate(s1.begin(),s2.begin()+k,0)即可。

#include <bits/stdc.h> using namespace std; //最小和问题&#xff0c;求出所有整数对求和&#xff0c;排序即可 int main() { int n1,n2; cin>>n1; int s1[n1]; for(int i0;i<n1;i) { cin>>s1[i]; } cin>>n…

金融项目实战 06|Python实现接口自动化——日志、实名认证和开户接口

目录 一、日志封装及应用&#xff08;理解&#xff09; 二、认证开户接口脚本编写 1、代码编写 1️⃣api目录 2️⃣script目录 2、BeautifulSoup库 1️⃣简介及例子 2️⃣提取html数据工具封装 3、认证开户参数化 一、日志封装及应用&#xff08;理解&#xff09; &…