老项目接入kafka消费信息另一种方式

前言

       这次跟大家分享kafka消费的另一种接入实现。其实原因是因为目前这个项目的框架太老了,springboot还是1.5的,直接用注解@KafkaListener无法消费的问题。我也不想调这个框架,没工时不说,万一再整出兼容性问题,那问题就大了,而且现在时间太赶了。


一、目标场景

  1. 目前是物联网设备的流水上报后,会存ES,同时经过物模型解析后,会往下游kafka推送信息。
  2. 下游系统接收kafka的设备流水,进行流水解析,解析成业务数据,做业务融合。

二、使用步骤

1.引入库

代码如下(示例):

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<!-- spring-kafka内部依赖kafka-clients升级补偿 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version>
</dependency>

       其实上面看起来说的是排除springboot的kafka-clients,引入自定义的kafka-clients做为升级补偿。
       编译、运行都不报错,但是使用@KafkaListener注解消费kafka信息,会报错,大致意思就是springframe版本低。应该就是低版本springboot的依赖springframe与高版本kafka-client依赖的springframe不匹配导致。
       没有去调整框架,具体就不发散了。


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;/*** 消费者listener** @author zhengwen**/
@Slf4j
@Component
//@Lazy
public class KafkaListenConsumer {@Resourceprivate DataTransService dataTransService;/*** 设备流水listenner** @param records 消费信息* @param ack     Ack机制*/@KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {log.debug("=====设备流水deviceFlowListen消费者接收信息====");try {for (ConsumerRecord record : records) {log.debug("---开启线程解析设备流水数据:{}", record.toString());dataTransService.deviceFlowTransSave(record);}} catch (Exception e) {log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);} finally {//手动提交偏移量ack.acknowledge();}}
}

       上面就是我最初直接使用注解写的消费方法。

2.主动启动消费


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;@Slf4j
public class DeviceFlowConsumerServerStart {@Resourceprivate DataTransService dataTransService;@Value("${easylinkin.analyze.device.flow.topic.consumer}")private String topic;@Value("${spring.kafka.bootstrap-servers:localhost:9092}")private String kafkaServiceUrl;@Value("${spring.kafka.consumer.group-id}")private String groupId;@PostConstructvoid start() {log.info("设备流水消费kafka服务启动!");//配置信息Properties props = new Properties();//先自定义的设置下,再用配置里的覆盖//声明kafka的地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);//每个消费者分配独立的消费者组编号props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//如果value合法,则自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//设置多久一次更新被消费消息的偏移量props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//自动重置offsetprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerParams);//订阅消费topickafkaConsumer.subscribe(Arrays.asList(topic));startConsumer(kafkaConsumer);log.info("设备流水消费kafka服务启动完成!");}private void startConsumer(KafkaConsumer<String, String> kafkaConsumer) {new Thread(()->{while (true){try {ConsumerRecords<String,String> poll = kafkaConsumer.poll(2000);Iterable<ConsumerRecord<String,String>> records = poll.records(topic);Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){dataTransService.deviceFlowTransSave(iterator.next());}}catch (Exception e){log.error("消费失败",e);startConsumer(kafkaConsumer);break;}}}).start();}}

       这里设置订阅后,启用线程消费,希望是消费异常不要把这里主线程搞挂了。因为我这里消费信息,会用一个dataTransService做设备流水的进一步解析,做业务融合,可能就涉及到事物嵌套的问题。


总结

  1. 针对老项目的另一种kafka消费接入方式
  2. 老springboot是真狗,各种接入不丝滑
  3. 就写到这里,希望能帮到大家,uping!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/775378.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot+mybatis项目集成p6spy输出格式化sql日志

本文背景:公司项目框架是基于springboot+mybatis的web项目,由于鄙人在使用过程中发现打印的mybatis日志每次都要粘贴出来,然后再用在线工具的格式化填充参数,很不方便,最近发现那个在线的工具打不开了,更不方便了,因此想有没有直接可以输出的填充好参数的sql语句,当然i…

STM32启动方式

s在STM32F10xxx里,可以通过BOOT[1:0]引脚选择三种不同启动模式。 启动方式&#xff1a;从内部的Flash中启动、 存储器映射&#xff1a; 0x0000 0000 -----0x0800 0000 映射的内部Flash

【保姆级讲解如何Stable Diffusion本地部署】

&#x1f308;个人主页:程序员不想敲代码啊&#x1f308; &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家&#x1f3c6; &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提…

sheng的学习笔记-AI-YOLO算法,目标检测

AI目录&#xff1a;sheng的学习笔记-AI目录-CSDN博客 目录 目标定位&#xff08;Object localization&#xff09; 定义 原理图 具体做法&#xff1a; 输出向量 图片中没有检测对象的样例 损失函数 ​编辑 特征点检测&#xff08;Landmark detection&#xff09; 定义&a…

SCI一区顶刊优化算法改进:基于强化学习的神经网络算法RLNNA,你绝对没见过,非常新颖!

声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 目录 神经网络优化算法NNA&#xff1a; 基于强化…

利用Python进行数据可视化Plotly与Dash的应用【第157篇—数据可视化】

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 利用Python进行数据可视化Plotly与Dash的应用 数据可视化是数据分析中至关重要的一环&…

SpringBoot集成 itextpdf 根据模板动态生成PDF

目录 需求说明前期准备Spring Boot 集成添加依赖构建工具类构建MultipartFile编辑PDF模板Java代码设置对应form的key-value 需求说明 根据合同模板&#xff0c;将动态的合同标签&#xff0c;合同方以及合同签约时间等动态的生成PDF&#xff0c;供用户下载打印。 前期准备 安…

Linux学习_进程

1.进程 概念&#xff1a;程序的一个执行实例&#xff0c;正在执行的程序等&#xff0c;担当分配系统资源&#xff08;CPU时间&#xff0c;内存&#xff09;的实体&#xff0c;进程PCB自己的代码和数据 PCB&#xff1a;进程信息被放在一个叫做进程控制块的数据结构中&#xff…

左手医生:医疗 AI 企业的云原生提效降本之路

相信这样的经历对很多人来说并不陌生&#xff1a;为了能到更好的医院治病&#xff0c;不惜路途遥远奔波到大城市&#xff1b;或者只是看个小病&#xff0c;也得排上半天长队。这些由于医疗资源分配不均导致的就医问题已是老生长谈。 云计算、人工智能、大数据等技术的发展和融…

LLMs之Grok-1:model.py文件解读—实现了基于Transformer的预训练语言模型+利用JAX框架支持高性能分布式计算

LLMs之Grok-1:model.py文件解读—实现了基于Transformer的预训练语言模型+利用JAX框架支持高性能分布式计算 目录 model.py文件解读—实现了基于Transformer的预训练语言模型+利用JAX框架支持高性能分布式计算

【嵌入式——C语言】VScode编写C程序、交叉编译

【嵌入式——C语言】VScode编写C程序、交叉编译 第一步第二步第三步第四步第五步第六步第七步第八步 第一步 下载Visual Studio Code下载地址 然后直接安装就可以了。 第二步 前提是你的电脑上安装了WSL。。。 打开vscode的扩展&#xff0c;输入WSL进行安装 安装完之后在窗…

装饰器模式实战运用(功能增强)

目录 前言 装饰器模式与代理模式的区别 UML plantuml 类图 实战代码 mybatis cache 前言 装饰器模式和代理模式在使用上很相似&#xff0c;都是在不修改原始类代码的情况下&#xff0c;动态地给真实对象的方法做增强。 装饰器模式是通过创建一个包装对象来包裹原有对象…

HuggingFace: 掌握自然语言处理的利器

引言 在当今人工智能领域中&#xff0c;自然语言处理&#xff08;NLP&#xff09;一直是备受关注的焦点之一。从智能助手到情感分析&#xff0c;NLP技术已经深入到我们日常生活和工作的方方面面。然而&#xff0c;随着数据量的增长和模型复杂性的提升&#xff0c;开发和部署高…

c# RichTextbox添加行号

使用另一个RichTextBox放在要添加行号的左边 使用以下代码 //uiRichTextBox1为右侧文本框&#xff0c;uiRichTextBox2为左侧文本框int lineIndex 0;private void uiRichTextBox1_TextChanged(object sender, EventArgs e){if (lineIndex > uiRichTextBox1.Lines.Length){L…

C++之STL整理(1)之vector、map数据结构初识

C之STL整理&#xff08;1&#xff09;之vector、map数据结构初识 注&#xff1a;整理一些突然学到的C知识&#xff0c;随时mark一下 例如&#xff1a;忘记的关键字用法&#xff0c;新关键字&#xff0c;新数据结构 C 的 STL C之STL整理&#xff08;1&#xff09;之vector、map数…

腾讯云4核8G12M云服务器一年646元,送3个月时长

2024年腾讯云4核8G服务器租用优惠价格&#xff1a;轻量应用服务器4核8G12M带宽646元15个月&#xff0c;CVM云服务器S5实例优惠价格1437.24元买一年送3个月&#xff0c;腾讯云4核8G服务器活动页面 txybk.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云4核8G服务器优惠价格 轻…

《每天十分钟》-红宝书第4版-对象、类与面向对象编程(六)

盗用构造函数 上节提到原型包含引用值导致的继承问题&#xff0c;为了解决这种问题&#xff0c;一种叫作“盗用构造函数”&#xff08;constructor stealing&#xff09;的技术在开发社区流行起来&#xff08;这种技术有时也称作“对象伪装”或“经典继承”&#xff09;。基本…

若依 3.8.7版本springboot前后端分离 整合mabatis plus

1.去掉mybatis 这一步我没有操作&#xff0c;看别人的博客有说不去掉可能冲突&#xff0c;也可能不冲突&#xff0c;我试下来就没去掉如需要去除&#xff0c;到总的pom.xml中properties标签下的<mybatis-spring-boot.version>x.x.x</mybatis-spring-boot.version>…

C++ pdf 打印 插入图片

一&#xff1a;使用PODOFO给PDF插入图片&#xff1a; #include <podofo.h> int main() { PoDoFo::PdfMemDocument pdfDocument; PoDoFo::PdfPage* page; PoDoFo::PdfImage image; PoDoFo::PdfVecObjects* vec_objects; PoDoFo::PdfRect rect; …

Kotlin by关键字

委托的概念 委托是一种设计模式,它的基本概念是:操作对象自己不会去处理某段逻辑,而是会把工作委托给另外一个辅助对象去处理。 class NewList<out T>(private val list: MutableList<T>) {fun isEmpty() = list.isEmpty()fun add(item: @UnsafeVariance T) = l…