kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录

  • 1、生产者拦截器
    • 1.1、创建生产者拦截器
    • 1.2、KafkaTemplate配置生产者拦截器
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者拦截器

1.1、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

1.2、KafkaTemplate配置生产者拦截器

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.4、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者拦截器","timespan": 1717490776329,"date": "2024-06-04 08:46:16"}]
]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/24329.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BeanDefinitionReader接口,Spring加载Bean的过程(非常流畅和容易理解)(Spring源码分析1)

一、前言 前言部分&#xff0c;介绍Spring框架的工作和大致原理&#xff0c;有基础的小伙伴可以跳过。 我们现在最常使用的开发框架SSM&#xff0c;分别是Spring、Spring MVC和Mybatis&#xff0c;其功能已经超出原生Spring非常多&#xff0c;所以想学习Spring原理&#xff0c;…

算法题 — 可可喜欢吃香蕉(二分查找法)

可可喜欢吃香蕉。这里有 n 堆香蕉&#xff0c;第 i 堆中有 piles[i] 根香蕉。警卫已经离开&#xff0c;将在 H 小时后回来。 可可可以决定它吃香蕉的速度为 speed (单位&#xff1a;根/小时)。每个小时&#xff0c;可可都会选择一堆香蕉&#xff0c;并吃掉 speed 根。如果这堆…

大漠插件7.2422

工具名称:大漠插件7.2422 /更新时间2024年6月2日 / v7.2422 1. 综合工具的图像编辑工具可以缩放窗口了 2. 增加AiFindPic AiFindPicEx AiFindPicMem AiFindPicMemEx AiEnableFindPicWindow 共5个接口 / 工具简介: 大漠 综合 插件 (dm.dll)采用vc6.0编写&#xff0c;识别速度超级…

TMS320F280049学习2:点灯

TMS320F280049学习2&#xff1a;点灯 文章目录 TMS320F280049学习2&#xff1a;点灯一、工程代码二、代码解释1.Device_initGPIO()2.EINT、DINT3.ERTM、DRTM 总结 一、工程代码 #include "driverlib.h" #include "device.h"#define DRV_LED2_PIN …

【Numpy】04 深入理解NumPy的高级索引技术

掌握NumPy&#xff1a;从新手到高手的数组操作之旅 前言 前面【Numpy】03 数组的切片和索引操作深入详解的切片和索引操作只能索引出有规律的元素数据&#xff0c;比如同轴向&#xff0c;若要索引如下元素则无法实现&#xff0c;下面就介绍数组的高级索引&#xff08;花式索引…

Java 初识

Java 的发展历程 Sun 公司。 Oracle 公司。 普通版本&#xff0c;也叫过渡版本。 正式版本&#xff0c;也叫长期支持版本&#xff08;LTS&#xff09;。 Java SE&#xff0c;Java EE&#xff0c;Java ME Java 技术体系分为三个平台&#xff1a;Java SE&#xff0c;Java EE&a…

EasyExcel导出多个sheet封装

导出多个sheet 在需求中&#xff0c;会有需要导出多种sheet的情况&#xff0c;那么这里使用easyexcel进行整合 步骤 1、导入依赖 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><d…

多尺度注意力创新

深度之眼17种多尺度注意力创新

西门子PLC学习之数据块的单个实例,多重实例与参数实例间的区别

首先介绍下函数&#xff0c;函数块与数据块这三个概念。 数据块 数据块里可以存储各种类型的参数。有人可能会问&#xff0c;m寄存器不是可以存储布尔值&#xff0c;8位&#xff0c;16位&#xff0c;32位变量吗&#xff0c;为什么要多此一举&#xff1f;因为虽然m寄存器能存储以…

​​​​​​​月薪20K的程序员应具备怎样的技能和水平?

在当今互联网高速发展的时代&#xff0c;程序员的薪资水平也在不断提高。对于月薪20K的程序员来说&#xff0c;他们不仅需要具备扎实的编程基础&#xff0c;还需要掌握一系列与工作相关的技能和能力。 月薪20K的程序员应具备怎样的技能和水平&#xff1f; 相信这是一个很多人都…

Elasticsearch 认证模拟题 - 8

一、题目 在集群中输入以下指令&#xff1a; PUT phones/_doc/1 {"brand":"Samsumg","model":"Galaxy S9","features":[{"type":"os", "value":"Android"},{"type":&q…

@Scheduled注解创建定时任务的 3 种模式

Scheduleed注解的介绍 在Spring Boot中&#xff0c;Scheduled 注解用于创建定时任务&#xff0c;提供了三种常见的模式&#xff1a; Fixed Rate&#xff08;固定速率&#xff09; Fixed Delay&#xff08;固定延迟&#xff09; Cron Expression&#xff08;Cron表达式&…

什么是智慧零售?智慧零售的发展前景如何?

在零售业的快速发展中&#xff0c;市场竞争日益激烈&#xff0c;产品同质化严重&#xff0c;线下销售与线上商店的竞争加剧&#xff0c;资金成本问题日益凸显。这些问题不仅限制了零售业的发展&#xff0c;也给消费者带来了诸多不便。然而&#xff0c;智慧零售的出现&#xff0…

ElementUI中date-picker组件,怎么把大写月份改为阿拉伯数字月份(例如:一月、二月,改为1月、2月)

要将 Element UI 的 <el-date-picker> 组件中的月份名称从中文大写&#xff08;如 "一月", "二月"&#xff09;更改为阿拉伯数字&#xff08;如 "1月", "2月"&#xff09;&#xff0c;需要进行一些定制化处理。可以通过国际化&a…

查看Hive表的描述信息,包括在HDFS上的Location信息

/hive/bin/beeline beeline> !connect jdbc:hive2://ip:10000 输入用户名 输入密码 DESCRIBE FORMATTED 表名; 下面的例子 No rows affected (0.820 seconds) 0: jdbc:hive2://ip:10000> DESCRIBE FORMATTED demo; INFO : Compiling command(queryIdroot_20240601141007…

45-5 护网溯源 - 远控木马样本溯源

在分析恶意样本时&#xff0c;需要查看包括作者名字、ID、IP地址、域名等在内的相关信息。 把恶意样本上传到微步、360沙箱云分析&#xff1a;样本报告-微步在线云沙箱 (threatbook.com) 动态分析 运行截图 发现该木马是与一个装机软件绑定的&#xff0c;你运行正常软件的时候…

封装组件库仿elementui<1>

目录 type属性 引入字体图标 button的点击事件 disabled属性 methods:{//点击事件是外部注册的handleClick(e){this.$emit(click,e)//通知父组件点击了&#xff0c;点了按钮&#xff0c;触发外界的click&#xff1f;传参为事件对象//向父组件派发了click事件} }, type属性…

操作系统基本特性:并发、共享、虚拟、异步

目录 一.并发 1.并发的优势 2.并发的实现 3.并发的应用场景 4.并发的挑战 二.共享 1.共享的优势 2.共享资源的实现机制 3.进程同步和互斥 4.避免冲突和死锁 5.实例分析 文件共享 内存共享 设备共享 三.虚拟 1.虚拟技术的优势 2.虚拟化技术的主要实现 3.实例分…

项目进度管理必备:15款最佳项目进度跟踪工具推荐

15好用的款主流项目进度管理软件&#xff1a;PingCode、Worktile、Trello、Tower、Asana、Smartsheet、Teambition、ClickUp、Wrike、Monday.com、Notion、禅道、飞书、云效、蓝凌。 严格的进度管理有助于更好地控制项目进展&#xff0c;提升团队效率&#xff0c;最终实现项目成…

组合已实现的函数完成K-means算法

本关任务 本关综合前面四个关卡的内容来实现K-means聚类算法。 相关说明 K-means是一类非常经典的无监督机器学习算法&#xff0c;通常在实际应用中用于从数据集中找出不同样本的聚集模式&#xff0c;其基本原理就是类中样本的距离要远远小于类间样本的距离。 K-means聚类算…