kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录

  • 1、生产者拦截器
    • 1.1、创建生产者拦截器
    • 1.2、KafkaTemplate配置生产者拦截器
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者拦截器

1.1、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

1.2、KafkaTemplate配置生产者拦截器

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.4、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者拦截器","timespan": 1717490776329,"date": "2024-06-04 08:46:16"}]
]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/24329.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BeanDefinitionReader接口,Spring加载Bean的过程(非常流畅和容易理解)(Spring源码分析1)

一、前言 前言部分&#xff0c;介绍Spring框架的工作和大致原理&#xff0c;有基础的小伙伴可以跳过。 我们现在最常使用的开发框架SSM&#xff0c;分别是Spring、Spring MVC和Mybatis&#xff0c;其功能已经超出原生Spring非常多&#xff0c;所以想学习Spring原理&#xff0c;…

大漠插件7.2422

工具名称:大漠插件7.2422 /更新时间2024年6月2日 / v7.2422 1. 综合工具的图像编辑工具可以缩放窗口了 2. 增加AiFindPic AiFindPicEx AiFindPicMem AiFindPicMemEx AiEnableFindPicWindow 共5个接口 / 工具简介: 大漠 综合 插件 (dm.dll)采用vc6.0编写&#xff0c;识别速度超级…

Java 初识

Java 的发展历程 Sun 公司。 Oracle 公司。 普通版本&#xff0c;也叫过渡版本。 正式版本&#xff0c;也叫长期支持版本&#xff08;LTS&#xff09;。 Java SE&#xff0c;Java EE&#xff0c;Java ME Java 技术体系分为三个平台&#xff1a;Java SE&#xff0c;Java EE&a…

EasyExcel导出多个sheet封装

导出多个sheet 在需求中&#xff0c;会有需要导出多种sheet的情况&#xff0c;那么这里使用easyexcel进行整合 步骤 1、导入依赖 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><d…

多尺度注意力创新

深度之眼17种多尺度注意力创新

西门子PLC学习之数据块的单个实例,多重实例与参数实例间的区别

首先介绍下函数&#xff0c;函数块与数据块这三个概念。 数据块 数据块里可以存储各种类型的参数。有人可能会问&#xff0c;m寄存器不是可以存储布尔值&#xff0c;8位&#xff0c;16位&#xff0c;32位变量吗&#xff0c;为什么要多此一举&#xff1f;因为虽然m寄存器能存储以…

​​​​​​​月薪20K的程序员应具备怎样的技能和水平?

在当今互联网高速发展的时代&#xff0c;程序员的薪资水平也在不断提高。对于月薪20K的程序员来说&#xff0c;他们不仅需要具备扎实的编程基础&#xff0c;还需要掌握一系列与工作相关的技能和能力。 月薪20K的程序员应具备怎样的技能和水平&#xff1f; 相信这是一个很多人都…

什么是智慧零售?智慧零售的发展前景如何?

在零售业的快速发展中&#xff0c;市场竞争日益激烈&#xff0c;产品同质化严重&#xff0c;线下销售与线上商店的竞争加剧&#xff0c;资金成本问题日益凸显。这些问题不仅限制了零售业的发展&#xff0c;也给消费者带来了诸多不便。然而&#xff0c;智慧零售的出现&#xff0…

ElementUI中date-picker组件,怎么把大写月份改为阿拉伯数字月份(例如:一月、二月,改为1月、2月)

要将 Element UI 的 <el-date-picker> 组件中的月份名称从中文大写&#xff08;如 "一月", "二月"&#xff09;更改为阿拉伯数字&#xff08;如 "1月", "2月"&#xff09;&#xff0c;需要进行一些定制化处理。可以通过国际化&a…

45-5 护网溯源 - 远控木马样本溯源

在分析恶意样本时&#xff0c;需要查看包括作者名字、ID、IP地址、域名等在内的相关信息。 把恶意样本上传到微步、360沙箱云分析&#xff1a;样本报告-微步在线云沙箱 (threatbook.com) 动态分析 运行截图 发现该木马是与一个装机软件绑定的&#xff0c;你运行正常软件的时候…

封装组件库仿elementui<1>

目录 type属性 引入字体图标 button的点击事件 disabled属性 methods:{//点击事件是外部注册的handleClick(e){this.$emit(click,e)//通知父组件点击了&#xff0c;点了按钮&#xff0c;触发外界的click&#xff1f;传参为事件对象//向父组件派发了click事件} }, type属性…

项目进度管理必备:15款最佳项目进度跟踪工具推荐

15好用的款主流项目进度管理软件&#xff1a;PingCode、Worktile、Trello、Tower、Asana、Smartsheet、Teambition、ClickUp、Wrike、Monday.com、Notion、禅道、飞书、云效、蓝凌。 严格的进度管理有助于更好地控制项目进展&#xff0c;提升团队效率&#xff0c;最终实现项目成…

减调食谱攻略:美味低卡又健康

早餐主要求质&#xff0c;也就是求营养&#xff0c;更确切的说是“均衡的营养&#xff0c;多重的营养元素”确保每天早餐不重样就差不多了。 早餐主食&#xff1a;蛋羹、糖心水煮蛋&#xff0c;皮蛋瘦肉粥、南瓜粥、小米粥&#xff0c;蒸煮玉米、南瓜、芋头、红薯&#xff0c;…

【Linux】The server quit without updating PID file的几种解决方案

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主。 &#x1f913; 同时欢迎大家关注其他专栏&#xff0c;我将分享Web前后端开发、人工智能、机器学习、深…

Java使用XWPFTemplate将word填充数据,并转pdf

poi-tl poi-tl&#xff08;poi template language&#xff09;是基于Apache POI的Word模板引擎。纯Java组件&#xff0c;跨平台&#xff0c;代码短小精悍&#xff0c;通过插件机制使其具有高度扩展性。 主要处理区域有这么几个模块: 依赖 <dependency><groupId>…

记忆++入门01

1.数字编码 2. 地点桩 1. 卧室 2.婴儿房 3.卫生间 4.次卧 5.书房 6.厨房 7.餐厅 8.客厅 9.阳台左 10.阳台右

深度学习每周学习总结P10(车牌识别)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 数据链接 提取码&#xff1a;ppv1 –来自百度网盘超级会员V5的分享 目录 0. 总结1. 数据导入、查看数据分类&#xff0c;自定义transform…

【TB作品】MSP430G2553单片机,MSP430 单片机读取 SHT30 传感器并显示数据

使用 MSP430 单片机读取 SHT30 传感器并显示数据 作品功能 本文介绍了如何使用 MSP430 单片机读取 SHT30 温湿度传感器的数据&#xff0c;并通过 OLED 屏幕显示实时的温度和湿度信息。通过此项目&#xff0c;您将学习如何配置 MSP430 的 I2C 接口、读取 SHT30 传感器的数据以…

高考志愿填报有哪些技巧和方法

一年一度高考季&#xff0c;又高考志愿填报的时侯了。高考志愿填报的时侯&#xff0c;需要考虑的因素比较多&#xff0c;有的同学觉是离家越远越好&#xff0c;要放飞自我&#xff0c;家长再也管不了我了。有的同学觉得专业比学校牌子重要&#xff0c;只要报个好专业&#xff0…

Nvidia/算能 +FPGA+AI大算力边缘计算盒子:大型机械智能预警系统

三一重工股份有限公司&#xff0c;是由三一集团创建于1994年&#xff0c;通过打破国人传统的“技术恐惧症”坚持自主创新迅速崛起。2011年7月&#xff0c;三一重工以215.84亿美元的市值荣登英国《金融时报》全球市值500强&#xff0c;是迄今唯一上榜的中国机械企业。2012年1月&…