SpringKafka生产者、消费者消息拦截

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态规划 —— 路径问题-礼物的最大价值

1. 剑指offer-JZ47-路径问题-礼物的最大价值 题目链接&#xff1a; 礼物的最大价值_牛客题霸_牛客网https://www.nowcoder.com/practice/2237b401eb9347d282310fc1c3adb134?tpId265&tqId39288&ru/exam/oj 2. 算法原理 状态表示&#xff1a;以莫一个位置位置为结尾 d…

安装git-lfs发生报错Could not find Git; can not register Git LFS.解决方案

解决方案&#xff1a; 步骤1.安装Github-Deskop Download GitHub Desktop | GitHub Desktophttps://desktop.github.com/download/ 步骤2.安装 Git&#xff01; Git for WindowsWe bring the awesome Git VCS to Windowshttps://gitforwindows.org/ 这两个安装完成之后即可…

Unity hub登录时一直无法进入license

直接只卸载unity hub&#xff0c;然后重新下载unity hub安装即可&#xff0c;重新登录即可。 有时会自动关联安装的位置&#xff0c;如果不能&#xff0c;则手动定位添加即可。 网上各种修复的方法操作费时费力。

three.js使用ShaderMaterial实现聚光灯光源demo

文章目录 顶点片元全部 核心&#xff1a; 顶点 varying vec3 vNormal;varying vec3 vViewPosition;void main() {vNormal normalMatrix * normal;vNormal normalize( vNormal );vec4 modelViewPosition modelViewMatrix * vec4(position, 1.0);gl_Position projectionMat…

【jvm】堆的内部结构

目录 1. 说明2. 年轻代&#xff08;Young Generation&#xff09;2.1 说明2.2 Eden区2.3 Survivor区 3. 老年代&#xff08;Old Generation&#xff09;3.1 说明3.2 对象存放3.3 垃圾回收 4. jdk7及之前5. jdk8及之后 1. 说明 1.JVM堆的内部结构主要包括年轻代&#xff08;You…

在线教育系统源码开发详解:网校培训平台搭建的核心技术

本篇文章&#xff0c;笔者将详细介绍在线教育系统源码的开发过程&#xff0c;重点聚焦网校培训平台搭建的核心技术&#xff0c;以期为有意从事在线教育行业的开发者提供实用的参考。 一、在线教育系统的构成 前端负责用户的交互体验&#xff0c;后端处理业务逻辑&#xff0c;…

DeepLearn-实现天气的识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 本次使用的数据集有晴天、雨天、多云和日出。 导入基本的包 包括读取文件、图像处理、科学计算和tensorflow的api包layers是层模块&#xff0c;提供了神经网络…

[实时计算flink]安全访问最佳实践

由于Flink无法提前预知您要使用的上下游系统&#xff0c;当作业需要访问不同的上下游系统来读写数据时&#xff0c;可能需要使用您的AccessKey信息作为访问凭证。主账号的AccessKey具备云账号下资源的全部权限&#xff0c;一旦泄露可能会造成严重后果。本文通过对RAM用户授予相…

pycharm与anaconda下的pyside6的安装记录

一、打开anaconda虚拟环境的命令行窗口&#xff0c;pip install&#xff0c;加入清华源&#xff1a; pip install PySide6 -i https://pypi.tuna.tsinghua.edu.cn/simple 二、打开pycharm&#xff0c;在文件--设置--工具--外部工具中配置一下三项&#xff1a; 1、 QtDesigner…

Java常用任务调度

JAVA 任务调度技术 前言 在日常开发过程中&#xff0c;我们经常会遇到周期性执行某段代码的场景。比如定期同步订单&#xff0c;定期更新商品信息&#xff0c;定期发送消息等。这些重复执行的代码可以抽象为一个任务(Task)。 一个Task的特点如下&#xff1a; 包含需要执行的业…

将多个commit合并成一个commit并提交

0 Preface/foreword 1 压缩多个commit方法 1.1 git merge --squash 主分支&#xff1a;main 开发分支&#xff1a;test 当前在test分支提交了8个commits&#xff0c;功能已经开发完成&#xff0c;需要将test分支合并到main分支&#xff0c;但是不想在合并时候&#xff0c;看…

开源一套基于若依的wms仓库管理系统,支持lodop和网页打印入库单、出库单的源码

大家好&#xff0c;我是一颗甜苞谷&#xff0c;今天分享一款基于若依的wms仓库管理系统&#xff0c;支持lodop和网页打印入库单、出库单的源码。 前言 在当今快速发展的商业环境中&#xff0c;库存管理对于企业来说至关重要。然而&#xff0c;许多企业仍然依赖于传统的、手动…

【Rust】环境搭建

▒ 目录 ▒ &#x1f6eb; 导读需求 1️⃣ 安装Chocolatey安装依赖 2️⃣ 安装RustRover安装toolchain&#xff08;rustup、VS&#xff09;重启配置生效设置安装插件 &#x1f4d6; 参考资料 &#x1f6eb; 导读 需求 重装系统&#xff0c;记录下环境搭建遇到的问题。 1️⃣ …

安装Ubuntu系统

打开vmware&#xff0c;新建一个Ubuntu虚拟机&#xff0c;点击自定义&#xff0c;进入下一步 &#xff0c;选择Workstation 17.x后&#xff0c;点击下一步 选择稍后安装系统选项&#xff0c;进入选择客户机操作系统页面&#xff0c;客户机操作系统选择Linux&#xff0c;版本选…

rom定制系列------红米note8_miui14安卓13定制修改固件 带面具root权限 刷写以及界面预览

&#x1f49d;&#x1f49d;&#x1f49d;红米note8机型代码&#xff1a;ginkgo。高通芯片。此固件官方最终版为稳定版12.5.5安卓11的版本。目前很多工作室需要高安卓版本的固件来适应他们的软件。并且需要root权限。根据客户要求。修改固件为完全root。并且修改为可批量刷写的…

了解一下,RN中怎么加载 threejs的

在React Native&#xff08;RN&#xff09;中加载和使用Three.js&#xff0c;一个流行的3D图形库&#xff0c;通常需要一些额外的步骤&#xff0c;因为Three.js主要是为Web浏览器设计的&#xff0c;而React Native则使用原生的渲染引擎。不过&#xff0c;有一些方法可以在React…

电脑仅一个C盘如何重装系统?超简单教程分享!

当我们的电脑仅配备一个C盘时&#xff0c;重装系统的过程可能会显得尤为棘手。因为一旦格式化硬盘&#xff0c;安装系统的分区也可能被一并清除&#xff0c;导致安装过程中断。这时候我们完全可以通过对电脑进行分区来解决这一问题。分区不仅能够帮助我们更好地管理硬盘空间&am…

提升网站速度与性能优化的有效策略与实践

内容概要 在数字化快速发展的今天&#xff0c;网站速度与性能优化显得尤为重要&#xff0c;它直接影响用户的浏览体验。用户在访问网站时&#xff0c;往往希望能够迅速获取信息&#xff0c;若加载时间过长&#xff0c;轻易可能导致他们转向其他更为流畅的网站。因此&#xff0…

流媒体协议.之(RTP,RTCP,RTSP,RTMP,HTTP)(二)

继续上篇介绍&#xff0c;本篇介绍一下封装RTP的数据格式&#xff0c;如何将摄像头采集的码流&#xff0c;音频的码流&#xff0c;封装到rtp里&#xff0c;传输。 有自己私有协议例子&#xff0c;有rtp协议&#xff0c;参考代码。注意不是rtsp协议。 一、私有协议 玩过tcp协议…

构建灵活、高效的HTTP/1.1应用:探索h11库

文章目录 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库背景这个库是什么&#xff1f;如何安装这个库&#xff1f;库函数使用方法使用场景常见的Bug及解决方案总结 构建灵活、高效的HTTP/1.1应用&#xff1a;探索h11库 背景 在现代网络应用中&#xff0c;HTTP协议是基础…