kafka-生产者监听器(SpringBoot整合Kafka)

文章目录

  • 1、生产者监听器
    • 1.1、创建生产者监听器
    • 1.2、发送消息测试
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者监听器

1.1、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

1.2、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.4、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:2,offset=0,timestamp=1717500589398

在这里插入图片描述
在这里插入图片描述

[[{"partition": 2,"offset": 0,"msg": "spring-kafka-生产者监听器","timespan": 1717500589398,"date": "2024-06-04 11:29:49"}]
]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/22447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图片批量纵向拼接神器:轻松插入间隔像素,生成真彩绚丽长图,让你的创意无限延伸!

在数字艺术的世界里&#xff0c;图片拼接早已不再是简单的组合&#xff0c;而是创意与技术的融合。如果你正在寻找一款能够快速将图片进行纵向拼接&#xff0c;并且能轻松插入间隔像素&#xff0c;同时保证色彩绚丽的神器&#xff0c;那么&#xff0c;我们首助编辑高手的长图拼…

如何实现单例模式及不同实现方法分析-设计模式

这是 一道面试常考题&#xff1a;&#xff08;经常会在面试中让手写一下&#xff09; 什么是单例模式 【问什么是单例模式时&#xff0c;不要答非所问&#xff0c;给出单例模式有两种类型之类的回答&#xff0c;要围绕单例模式的定义去展开。】 单例模式是指在内存中只会创建…

React 中的 ForwardRef的使用

React 中的 forwardRef Hooks 是指将子组件的 Dom 节点暴露给给父组件&#xff0c;在 React 中如果想要访问 Dom 节点是通过 useRef 这个 hooks&#xff0c;而 forwardHook 在 useRef 做了扩展。useRef 是当前组件中间中的节点&#xff0c;而 forwardRef 相当于做了一层封装将父…

springAOP 使用aop代替SqlsessionUtil业务层操作

在Maven框架pom配置文件中导入spring相关依赖&#xff1a; <dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency…

【前端每日基础】day36——vue组件的通信方式

在Vue.js中&#xff0c;组件通信是一个重要的概念&#xff0c;它允许组件之间进行数据传递和事件处理。Vue.js提供了多种方式来实现组件之间的通信&#xff0c;适用于不同的场景和需求。以下是Vue组件通信的几种常见方式及其详细介绍&#xff1a; 父子组件通信 父组件向子组件…

6、资产评估专家指引第9号—数据资产评估

本专家指引是一种专家建议。评估机构执行资产评估业务,可以参照本专家指引,也可以根据具体情况采用其他适当的做法。中国资产评估协会将根据业务发展,对本专家指引进行更新。 第一章 引言 第一条 针对数据资产特点,结合目前实际操作中的部分难点和要点,中国资产评估协会…

mac配置Personal Access Tokens

背景 在macbook环境中&#xff0c;使用idea、android studio、xcode时&#xff0c;使用gitlab需要登录&#xff0c;而直接使用文明密码是不允许登录的&#xff0c;这时就需要换种方式&#xff0c;这里有两种&#xff1a;ssh、Access Tokens&#xff0c;在公用电脑上推荐使用Ac…

第一个SpringBoot项目

目录 &#x1f4ad;1、新建New Project IDEA2023版本创建Sping项目只能勾选17和21&#xff0c;却无法使用Java8&#xff1f;&#x1f31f; 2、下载JDK 17&#x1f31f; &#x1f4ad;2、项目创建成功界面 1、目录 &#x1f31f; 2、pom文件&#x1f31f; &#x1f4ad;3、…

前K个高频元素-力扣

本题想到的解法是使用哈希表首先统计数组中每个元素出现的次数&#xff0c;然后对出现次数进行排序&#xff0c;最后进行输出。看了题解学习到使用优先级队列 小顶堆来完成&#xff0c;小顶堆的排序规则由自己来定义。 代码如下&#xff1a; class Solution { public:class My…

ctfshow 年CTF web

除夕 Notice: Undefined index: year in /var/www/html/index.php on line 16 <?phpinclude "flag.php";$year $_GET[year];if($year2022 && $year1!2023){echo $flag; }else{highlight_file(__FILE__); } 弱比较绕过很简单&#xff0c;连函数都没有直…

代码随想录算法训练营Day59 | 503.下一个更大元素II 42. 接雨水

代码随想录算法训练营Day59 | 503.下一个更大元素II 42. 接雨水 LeetCode 503.下一个更大元素II 题目链接&#xff1a;LeetCode 503.下一个更大元素II 思路&#xff1a; class Solution { public:vector<int> nextGreaterElements(vector<int>& nums) {// …

【数据分享】中国民政统计年鉴(1949-2022)

大家好&#xff01;今天我要向大家介绍一份重要的中国民政统计数据资源——《中国民政统计年鉴》。这份年鉴涵盖了从1949年到2022年中国民政统计全面数据&#xff0c;并提供限时免费下载。&#xff08;无需分享朋友圈即可获取&#xff09; 数据介绍 从1949年到2022年&#xf…

程序员的职业素养

在当今这个快速发展的技术世界中&#xff0c;程序员的职业素养已经成为了一个热门话题。随着技术的不断进步&#xff0c;程序员不仅需要掌握强大的技术能力&#xff0c;更需要具备一系列的职业素养来确保他们能够在职业生涯中取得成功。 《程序员的职业素养》一书由著名的软件工…

云原生架构案例分析_4.某电商业务云原生改造

名称解释&#xff1a; AHAS&#xff1a;应用高可用服务&#xff08;Application High Availability Service&#xff09;是一款专注于提高应用高可用能力的SaaS产品&#xff0c;主要包含多活容灾、故障演练和流量防护三个独立的功能模块。其中流量防护已迁移至微服务治理服务MS…

Java - Path接口和Files工具类

在Java中&#xff0c;Path接口和Files工具类是Java 7中引入的java.nio.file包的一部分&#xff0c;用于文件和文件系统的操作。这些API提供了比传统的java.io包更为强大和灵活的文件处理功能。 Path接口 Path接口表示文件系统中的路径&#xff0c;它可以是文件名或目录名。Pa…

再度“冲三”失利的泸州老窖,还能拿出什么“杀手锏”?

正值“618”&#xff0c;白酒行业也迎来了重要创收时刻。 据悉&#xff0c;天猫“618购物节”开卖首日&#xff0c;酒水直播销售额增长超1300%&#xff0c;拉动白酒行业增长147%。 这一增长背后&#xff0c;赛道的火药味也愈发浓厚&#xff0c;今年618大促中&#xff0c;五粮…

第十周:目标计划管理

1. 企业的目的 企业不同时期的目标是不一样的&#xff0c;第一阶段是保证存活&#xff1b;第二阶段是为了发展&#xff0c;加强公司业绩&#xff0c;达到预期的盈利&#xff1b;第三阶段是在发展壮大之后&#xff0c;有更多精力投入公司健康运转的事情&#xff0c;保证长久的生…

Flutter 中的 DefaultTextStyle 小部件:全面指南

Flutter 中的 DefaultTextStyle 小部件&#xff1a;全面指南 Flutter 是一个由 Google 开发的跨平台 UI 框架&#xff0c;它提供了丰富的组件来帮助开发者构建高性能、美观的应用。在 Flutter 的布局体系中&#xff0c;DefaultTextStyle 是一个重要的组件&#xff0c;它允许开…

【电路笔记】-Sallen-Key滤波器

Sallen-Key滤波器 Sallen-Key 滤波器拓扑用作实现高阶有源滤波器的构建块。 1、概述 Sallen-Key 滤波器设计是一种二阶有源滤波器拓扑,我们可以将其用作实现高阶滤波器电路的基本构建块,例如低通 (LPF)、高通 (HPF) 和带通 ( BPF)滤波器电路。 正如我们在本滤波器部分中…

<读评论……?>

为纪念今天数学比赛AK 回复一下比较常见的问题&#xff01; Q1:平常写代码时使用万能头文件好还是一个一个慢慢写好&#xff1f; A:我其实个人认为万能头好&#xff0c;这样比较省时 Q2:有很多书上写int main&#xff08;&#xff09;可以去掉int&#xff0c; 这是真的吗&#…