kafka-生产者监听器(SpringBoot整合Kafka)

文章目录

  • 1、生产者监听器
    • 1.1、创建生产者监听器
    • 1.2、发送消息测试
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者监听器

1.1、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

1.2、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.4、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:2,offset=0,timestamp=1717500589398

在这里插入图片描述
在这里插入图片描述

[[{"partition": 2,"offset": 0,"msg": "spring-kafka-生产者监听器","timespan": 1717500589398,"date": "2024-06-04 11:29:49"}]
]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/22447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图片批量纵向拼接神器:轻松插入间隔像素,生成真彩绚丽长图,让你的创意无限延伸!

在数字艺术的世界里&#xff0c;图片拼接早已不再是简单的组合&#xff0c;而是创意与技术的融合。如果你正在寻找一款能够快速将图片进行纵向拼接&#xff0c;并且能轻松插入间隔像素&#xff0c;同时保证色彩绚丽的神器&#xff0c;那么&#xff0c;我们首助编辑高手的长图拼…

如何实现单例模式及不同实现方法分析-设计模式

这是 一道面试常考题&#xff1a;&#xff08;经常会在面试中让手写一下&#xff09; 什么是单例模式 【问什么是单例模式时&#xff0c;不要答非所问&#xff0c;给出单例模式有两种类型之类的回答&#xff0c;要围绕单例模式的定义去展开。】 单例模式是指在内存中只会创建…

React 中的 ForwardRef的使用

React 中的 forwardRef Hooks 是指将子组件的 Dom 节点暴露给给父组件&#xff0c;在 React 中如果想要访问 Dom 节点是通过 useRef 这个 hooks&#xff0c;而 forwardHook 在 useRef 做了扩展。useRef 是当前组件中间中的节点&#xff0c;而 forwardRef 相当于做了一层封装将父…

springAOP 使用aop代替SqlsessionUtil业务层操作

在Maven框架pom配置文件中导入spring相关依赖&#xff1a; <dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency…

mac配置Personal Access Tokens

背景 在macbook环境中&#xff0c;使用idea、android studio、xcode时&#xff0c;使用gitlab需要登录&#xff0c;而直接使用文明密码是不允许登录的&#xff0c;这时就需要换种方式&#xff0c;这里有两种&#xff1a;ssh、Access Tokens&#xff0c;在公用电脑上推荐使用Ac…

第一个SpringBoot项目

目录 &#x1f4ad;1、新建New Project IDEA2023版本创建Sping项目只能勾选17和21&#xff0c;却无法使用Java8&#xff1f;&#x1f31f; 2、下载JDK 17&#x1f31f; &#x1f4ad;2、项目创建成功界面 1、目录 &#x1f31f; 2、pom文件&#x1f31f; &#x1f4ad;3、…

【数据分享】中国民政统计年鉴(1949-2022)

大家好&#xff01;今天我要向大家介绍一份重要的中国民政统计数据资源——《中国民政统计年鉴》。这份年鉴涵盖了从1949年到2022年中国民政统计全面数据&#xff0c;并提供限时免费下载。&#xff08;无需分享朋友圈即可获取&#xff09; 数据介绍 从1949年到2022年&#xf…

云原生架构案例分析_4.某电商业务云原生改造

名称解释&#xff1a; AHAS&#xff1a;应用高可用服务&#xff08;Application High Availability Service&#xff09;是一款专注于提高应用高可用能力的SaaS产品&#xff0c;主要包含多活容灾、故障演练和流量防护三个独立的功能模块。其中流量防护已迁移至微服务治理服务MS…

再度“冲三”失利的泸州老窖,还能拿出什么“杀手锏”?

正值“618”&#xff0c;白酒行业也迎来了重要创收时刻。 据悉&#xff0c;天猫“618购物节”开卖首日&#xff0c;酒水直播销售额增长超1300%&#xff0c;拉动白酒行业增长147%。 这一增长背后&#xff0c;赛道的火药味也愈发浓厚&#xff0c;今年618大促中&#xff0c;五粮…

【电路笔记】-Sallen-Key滤波器

Sallen-Key滤波器 Sallen-Key 滤波器拓扑用作实现高阶有源滤波器的构建块。 1、概述 Sallen-Key 滤波器设计是一种二阶有源滤波器拓扑,我们可以将其用作实现高阶滤波器电路的基本构建块,例如低通 (LPF)、高通 (HPF) 和带通 ( BPF)滤波器电路。 正如我们在本滤波器部分中…

【匹配线段问题】

问题&#xff1a; 如下图所示。图中有两行正整数&#xff0c;每行中有若干个正整数。如果第一行的某个数r与第二行的某个数相同&#xff0c;这样就可以在这两个正整数之间划一条线&#xff0c;并称之为r-匹配线段。下图中存在3-匹配线段和2-匹配线段。 请编写完整程序&#xf…

【C语言】详解函数(庖丁解牛版)

文章目录 1. 前言2. 函数的概念3.库函数3.1 标准库和头文件3.2 库函数的使用3.2.1 头文件的包含3.2.2 实践 4. 自定义函数4.1 自定义函数的语法形式4.2 函数的举例 5. 形参和实参5.1 实参5.2 形参5.3 实参和形参的关系 6. return 语句6. 总结 1. 前言 一讲到函数这块&#xff…

数字塔问题

#include<iostream> using namespace std; //从下向上得到最优值 void dtower(int a[][100],int s[][100],int n) {for(int in; i>1; i--){for(int j1; j<i; j){if(in)s[i][j]a[i][j];else{int ts[i1][j];if(t<s[i1][j1])ts[i1][j1];s[i][j]a[i][j]t;}}} } void…

前端地图中,已知一个点位,获取相同经度或者纬度下的,某个距离的另一个点位

效果图说明&#xff1a;我在圆的中心点位&#xff0c;找到他某个直线距离的另个一点&#xff0c;标注两者之间的距离。如图所示是25000米。 沿纬度方向移动 在相同经度下&#xff0c;计算沿纬度方向移动1000米的新点位&#xff1a; function calculateLatitudePoint(lat, ln…

10-Django项目--Ajax请求

目录 Ajax请求 简单示范 html 数据添加 py文件 html文件 demo_list.html Ajax_data.py 图例 Ajax请求 简单示范 html <input type"button" id"button-one" class"btn btn-success" value"点我"> ​ ​ <script>/…

如何找出你的Windows 10的内部版本和版本号?这里提供两种方法

你过去可能没有真正考虑过Windows内部版本号,除非这是你工作的一部分。以下是如何了解你运行的Windows 10的内部版本、版本和版本号。 内部版本意味着什么 Windows一直使用内部版本。它们代表着对Windows的重大更新。传统上,大多数人都是根据他们使用的主要命名版本(Windo…

使用raise语句抛出异常

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 如果某个函数或方法可能会产生异常&#xff0c;但不想在当前函数或方法中处理这个异常&#xff0c;则可以使用raise语句在函数或方法中抛出异常。rai…

学习DHCP动态主机配置协议

目录&#xff1a; dhcp 动态主机配置协议 ftp文件传输协议 dhcp 动态主机配置协议 服务器配置好了地址池 192.168.124.10 -192.168.124.20 客户端从地址池当中随机获取一个ip地址&#xff0c;ip地址会发生变化&#xff0c;使用服务端提供的ip地址&…

【MATLAB】概述1

非 ~ 注释 % 定义 >> 数组 赋值 赋值&#xff1a;>> x1 函数 数组 x[x1,x2] 行向量&#xff08;&#xff0c;or ) x[x1;x2] 列向量 x. 转置等间隔向量 1-10 向量&#xff1a;>>xlinspace(1,10,10) 矩阵 矩阵&#xff1a;>>A[1,2,3;4,5,6;7,8,9] …

提取伴奏与人声分离软件:5款手机必备音频软件

在数字音乐的浪潮中&#xff0c;音频处理软件已经成为手机用户不可或缺的工具。特别是在音乐制作、卡拉OK伴奏制作以及日常音频编辑中&#xff0c;人声与伴奏的分离显得尤为重要。本文将为您介绍五款免费且实用的手机音频软件&#xff0c;它们都具有人声与伴奏分离的功能&#…