kafka-消费者-消费异常处理(SpringBoot整合Kafka)

文章目录

  • 1、消费异常处理
    • 1.1、application.yml配置
    • 1.2、注册异常处理器
    • 1.3、消费者使用异常处理器
    • 1.4、创建生产者发送消息
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:
      • 1.8.1、第一次启动SpringKafkaConsumerApplication
      • 1.8.n、第n次启动SpringKafkaConsumerApplication

1、消费异常处理

1.1、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量# 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false# 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始# earliest:从最早的消息开始消费# latest:第一次从LEO位置开始消费# none:如果主题分区没有偏移量,则抛出异常auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual# 手动ack:消费有异常时停止ack-mode: manual_immediate

在这里插入图片描述

1.2、注册异常处理器

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}//方法名就是注入到容器中对象的名称@Beanpublic ConsumerAwareListenerErrorHandler myErrorHandler() {//创建异常处理器:消费者异常时 且注册使用当前异常处理器 会生效return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {System.out.println("出现异常,消息内容:value = " + message.getPayload());System.out.println("header = "+message.getHeaders());System.out.println("异常信息:" + e.getMessage());System.out.println("=================");return null;}};}
}

1.3、消费者使用异常处理器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListenerAck {/*** 自动ack可能会导致漏消息*      spring-kafka:*                     自动ack 如果有异常,会死循环获取消息重新消费*                        不能继续向后消费消息,会导致消息积压**                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,*                        如果消息消费失败,会继续向后消费* @param record*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitions = {"0"})}, groupId = "my_group1",errorHandler = "myErrorHandler")public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {System.out.println("my_group1消费者获取分区0的消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());int i = 1/0;// 手动ack:手动确认消息已经消费 broker 会提交消费者偏移量acknowledgment.acknowledge();}
}

在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定ack-mode: manual_immediate消费"+i);}}}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

1.8.1、第一次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=91c96206-2381-5fa1-b391-52627056762f, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=8755d954-615c-37ff-67f0-85521e090b03, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=68082673-07d2-6f69-3935-c7034a7caf81, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=bcbda053-9536-df91-d937-2688b5d4c6ea, timestamp=1717672488754}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

1.8.n、第n次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=6bc9edb9-ad1c-e00e-9b27-c0c540248091, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=14e6951b-b25f-10ca-702c-a1699d25645b, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=26e85ef8-2502-fd29-8d5b-091ec0362900, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=25c6b6c8-d02d-2091-7c6f-ebaea6c52d1d, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

此时如果不关闭SpringKafkaConsumerApplication,生产者继续发送消息,消费者只会往后消费,不会从头再次消费

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日进2000,我怎么做到的

昨天遇到一个有意思的项目&#xff0c;让我一天进账2000&#xff0c;一个字&#xff1a;爽。 这几天接洽了一位新客户&#xff0c;主要诉求就是优化系统&#xff0c;基于LNMP的系统优化。正好这个领域我比较熟悉&#xff0c;以前都是在公司做项目&#xff0c;也不怎么涉猎系统优…

HCIP-Datacom-ARST自选题库_10_其他判断【23道题】

1.端到端时延等于路径上所有处理时延与队列时延之和。 2.部署PPP Multilink之后&#xff0c;数据将根据源地址和目的地址均匀的分配在各条成员链路上。 3.流镜像分为本地流镜像和远程流镜像两种方式。√ 4.IP报文中用Tos字段进行Q0S标记&#xff0c;Tos字段中是使用前6bit来…

知识图谱的应用---智能电网

文章目录 智能电网典型应用 智能电网 智能电网以物理电网为基础&#xff0c;将现代先进的传感测量技术、通讯技术、信息技术、计算机技术和控制技术与物理电网高度集成而形成的新型电网。它以充分满足用户对电力的需求和优化资源配置、确保电力供应的安全性、可靠性和经济性、满…

2024.6.9 四

Python的异常处理 在python里,错误和异常是不同的概念 错误: Python 的语法错误或者称之为解析错,大多是因为写代码写错了出现的 异常: 即便 Python 程序的语法是正确的&#xff0c;在运行它的时候&#xff0c;也有可能发生错误。运行期检测到的错误被称为异常。 大多数的异常…

Ajax 快速入门

Ajax 概念&#xff1a;Ajax是一种Web开发技术&#xff0c;允许在不重新加载整个页面的情况下&#xff0c;与服务器交换数据并更新网页的部分内容。 作用&#xff1a; 数据交换&#xff1a;Ajax允许通过JavaScript向服务器发送请求&#xff0c;并能够接收服务器响应的数据。 异…

STM32H750启动和内存优化(分散加载修改)

前些日子有个朋友一直给我推荐STM32H750这款芯片&#xff0c;说它的性价比&#xff0c;说它多么多么好。于是乎&#xff0c;这两天试了试&#xff0c;嚯&#xff0c;真香&#xff01;我们先看看基本配置 这里简单总结下&#xff0c;cortex-m7内核&#xff0c;128k片内flash …

HTML-CSS练习例子

HTML CSS 练习 https://icodethis.com 作为前端练习生。不敲代码只看&#xff0c;入门是很慢的&#xff0c;所以直接实战是学习前端最快的途径之一。 这个网站练习HTML CSS的&#xff0c;可以打开了解一下&#xff0c;可以每天打卡&#xff0c;例子简单&#xff0c;循序渐进&…

Python第二语言(六、Python异常)

目录 1. 捕获异常&#xff08;try: except: else: finally:&#xff09; 1.1 概念 1.2 基础语法&#xff08;try&#xff1a; except&#xff1a;&#xff09; 1.3 捕获异常&#xff08;异常也有类型&#xff09; 1.4 捕获多个异常&#xff08;try&#xff1a;except(Name…

基于阿里云服务网格流量泳道的全链路流量管理(三):无侵入式的宽松模式泳道

作者&#xff1a;尹航 在前文《基于阿里云服务网格流量泳道的全链路流量管理&#xff08;一&#xff09;&#xff1a;严格模式流量泳道》、《基于阿里云服务网格流量泳道的全链路流量管理&#xff08;二&#xff09;&#xff1a;宽松模式流量泳道》中&#xff0c;我们介绍了流…

大数据数仓的数据回溯

在大数据领域&#xff0c;数据回溯是一项至关重要的任务&#xff0c;它涉及到对历史数据的重新处理以确保数据的准确性和一致性。 数据回溯的定义与重要性 数据回溯&#xff0c;也称为数据补全&#xff0c;是指在数据模型迭代或新模型上线后&#xff0c;对历史数据进行重新处理…

开源多平台AI音乐生成器本地安装结合cpolar内网穿透实现远程访问

文章目录 前言1. 本地部署2. 使用方法介绍3. 内网穿透工具下载安装4. 配置公网地址5. 配置固定公网地址 前言 本文主要介绍如何在Windows系统电脑上快速本地部署一个文字生成音乐的AI创作工具MusicGPT&#xff0c;并结合cpolar内网穿透工具实现随时随地远程访问使用。 MusicG…

基于stm32最小版的超声波测距模块

目录 一、模块准备 二、HC-SR04模块原理解释 三、程序完整代码 四、烧录结果 总结 一、模块准备 STM32F103C8T6 HC-SR04 ST-Link&#xff08;其他烧录器也可以&#xff09; 0.96寸OLED屏幕&#xff08;非必须&#xff0c;仅供显示测距结果&#xff0c;可以使用串口助手代替…

nodejs最新某东h5st(4.7.2)参数分析与javascript逆向纯算法还原(含算法源码)(2024-06-09)

一、作者声明&#xff1a; 文章仅供学习交流与参考&#xff01;严禁用于任何商业与非法用途&#xff01;否则由此产生的一切后果均与作者无关&#xff01;如有侵权&#xff0c;请联系作者本人进行删除&#xff01; 二 、写在前面 h5st从4.1一路更新到4.7.2&#xff0c;逐渐vmp…

66. UE5 RPG 实现远程攻击武器配合角色攻击动画

在制作游戏中&#xff0c;我们制作远程攻击角色&#xff0c;他们一般会使用弓箭&#xff0c;弩&#xff0c;弹弓等武器来进行攻击。比如你使用弓箭时&#xff0c;如果角色在播放拉弓弦的动画&#xff0c;但是弓箭武器没有对应的表现&#xff0c;会显得很突兀。所以&#xff0c;…

k8s和deepflow部署与测试

Ubuntu-22-LTS部署k8s和deepflow 环境详情&#xff1a; Static hostname: k8smaster.example.net Icon name: computer-vm Chassis: vm Machine ID: 22349ac6f9ba406293d0541bcba7c05d Boot ID: 605a74a509724a88940bbbb69cde77f2 Virtualization: vmware Operating System: U…

克鲁斯卡尔算法最小生成树--C语言

同样是最小生成树&#xff0c;普利姆算法是从一个起始顶点开始&#xff0c;逐步扩展生成树&#xff0c;每次选择连接生成树和未包含顶点的最小边。而克鲁斯卡尔算法是按权值排序的方式&#xff0c;从最小的边开始逐步添加到生成树中&#xff0c;确保不会形成环&#xff0c;直到…

自友科技破解走班教育排课难题

新高考后&#xff0c;校园教务都面临着晋级&#xff0c;其中走班教育的分班排课是个巨大的挑战。 所以在分班排课的时候要清楚一下几个问题 一是&#xff1a;清楚的核算学生的选考科目。学生选科提交后做好并承认&#xff0c;最好是在分班后不要改或很少的一部分人改动。 二是…

达梦8 探寻达梦排序原理:传统排序机制(SORT_FLAG=0)

测试版本&#xff1a;--03134283938-20221019-172201-20018 达梦的排序机制由四个dm.ini参数控制&#xff1a; #maximum sort buffer size in Megabytes &#xff0c;有效值范围&#xff08;1~2048&#xff09; SORT_BUF_SIZE 100 #ma…

SpringBoot: 启动流程和类装载

前面我们学过Spring定制了自己的可执行jar&#xff0c;将真正执行时需要的类和依赖放到BOOT-INF/classes、BOOT-INF/lib来&#xff0c;为了能够识别这些为止的源文件&#xff0c;Spring定制了自己类加载器&#xff0c;本节我们来讲解这个类加载器。本节涉及的内容主要包括: Sp…

Linux部署调度工具xxl-job

背景&#xff1a; Pentaho Data Integration&#xff08;kettle&#xff09;作为用户规模最多的开源ETL工具&#xff0c;强大简洁的功能深受广大ETL从业者的欢迎。但kettle本身的调度监控功能却非常弱。Pentaho官方都建议采用crontab(Unix&#xff0c;linux平台)和计划任务(Win…