springboot集成kafka消费数据

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/623264.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yolov7混淆矩阵

测试部分代码 import argparse import json import os from pathlib import Path from threading import Threadimport numpy as np import torch import yaml from tqdm import tqdmfrom models.experimental import attempt_load from utils.datasets import create_dataload…

10.Spring Type Convertion 原理

目录 概述Spring Type Convertion总结Spring MVC层的数据转换debug 关键断点测试代码关键处调试字符串Long结束概述 此篇文章对 Spring Type Convertion 做深入学习。 两个源码调试例子,一个是转换成 String ,一个转换成 Long 环境:spring boot 2.6.13 相关文章如下: 文章…

嵌入式培训机构四个月实训课程笔记(完整版)-C++和QT编程第二天-类与对象(物联技术666)

链接:https://pan.baidu.com/s/1Am83Ut449WCbuTiodwJWgg?pwd=1688 提取码:1688 上午:类和对象 下午:类和对象高级应用 教学内容: 1、构造函数\析构函数\拷贝构造函数 构造函数: 每一个对象的创建都必须初始化,如果在没有写初始化函数(即构造函数),系统会默认写…

OpenCV——八邻域断点检测

目录 一、理论基础1、八邻域2、断点检测 二、代码实现三、结果展示四、参考链接 OpenCV——八邻域断点检测由CSDN点云侠原创&#xff0c;爬虫自重。如果你不是在点云侠的博客中看到该文章&#xff0c;那么此处便是不要脸的爬虫。 一、理论基础 1、八邻域 图1 八邻域示意图 图…

基于嵌入式AI的ROI区域视频传输系统设计与实现

在当今快速发展的智能监控领域&#xff0c;实现高效的视频流处理和传输成为了一项重要挑战。本文介绍了一个基于嵌入式AI平台的视频传输系统设计&#xff0c;该系统能够识别视频中的关键区域&#xff08;ROI&#xff09;&#xff0c;并对这些区域进行高效的编码处理。特别地&am…

Python数据的处理

一.字符串拼接的几种方式 使用str.join()方法进行拼接字符串直接拼接使用格式化字符串进行拼接 ​ s1hello s2world #(1)使用➕进行拼接 print(s1s2) #(2)使用字符串的join&#xff08;&#xff09;方式 print(.join([s1,s2])) print(*.join([s1,s2])) print(你好.join([s1,s…

neus2安装运行纪实

./build/testbed --scene transforms.json

Python操作MySQL入门教程,使用pymysql操作MySQL,有录播直播私教课

创建数据库 create database gx character set utf8mb4;连接数据库 #!/usr/bin/python3import mysql as pymysql# 打开数据库连接 db pymysql.connect(hostlocalhost,port3306,userroot,passwordzhangdapeng520,databasegx)# 使用 cursor() 方法创建一个游标对象 cursor cur…

【MATLAB随笔】GUI编程(未完结)

文章目录 一、创建图窗1.1 figure 函数详解1.11 窗口标识1.12 窗口外观1.13 位置和大小 二、xxx 一、创建图窗 跟很多GUI编程一样的&#xff0c;先创建一个基本的图窗&#xff0c;然后再添加按钮、文章、标签&#xff0c;绑定函数等等&#xff0c;比如python的tkinter。 MATL…

C/C++算法从小白到高手(1):排序算法

1. 冒泡排序 (1) 基本思路 冒泡排序是一种简单的、但效率极低的排序算法&#xff0c;基本思路是重复地遍历待排序的序列&#xff0c;通过相邻元素的比较和交换&#xff0c;将较大&#xff08;或较小&#xff09;的元素逐步"冒泡"到右侧&#xff08;或左侧&#xff0…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《考虑风电出力不确定性的电网无功-电压控制鲁棒分区方法》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 这个标题涉及到考虑风电出力不确定性的电网无功-电压控制鲁棒分区方法。让我们逐步解读这个标题的主要关键词和概念&#xff1a; 考虑风电出力不确定性…

android,app,小程序页面布局的各种栏

手机页面从上到下通常包含以下几个栏&#xff1a; 1.状态栏&#xff08;Status Bar&#xff09;&#xff1a;位于屏幕的顶部&#xff0c;用于显示手机的系统状态信息&#xff0c;例如时间、电池电量、信号强度等。状态栏也可以包含一些通知图标和快捷设置图标。 2.标题栏&…

根据编码规则使用nodejs脚本来大批量生成星原物联网设备采集点表

在使用星原网关时&#xff0c;需要导入点表&#xff0c;由于设备的点表非常的多&#xff0c;可写的点表有1095个。 所有根据编码规律&#xff0c;编写了一段nodejs代码&#xff0c;来生成点表。 一个编码有四部分组成&#xff0c; 分别是 [‘A’, ‘B’, ‘C’, ‘D’, ‘E’]…

CTF伪随机数爆破

要了解伪随机数的爆破首先你的先知道什么是PHP种子&#xff0c; 借用在rand()函数中&#xff0c;我们可以通过设置随机数种子来影响随机数的生成。例如&#xff0c;在rand()函数中加入了随机数种子编码后&#xff0c;每次运行程序将会生成同样的随机整数序列。这个就是伪随机数…

第28关 k8s监控实战之Prometheus(八)

大家好&#xff0c;我是博哥爱运维。从这节课开始&#xff0c;博哥计划引入golang&#xff08;简称go&#xff09;语言开发的一些内容&#xff0c;没有接触过go语言的同学也不用慌&#xff0c;我会尽量以一个新人的角度&#xff0c;去把这些go开发的内容讲得通俗一些。这节课还…

布隆过滤器四种实现(Java,Guava,hutool,Redisson)

1.背景 为预防大量黑客故意发起非法的时间查询请求&#xff0c;造成缓存击穿&#xff0c;建议采用布隆过滤器的方法解决。布隆过滤器通过一个很长的二进制向量和一系列随机映射函数&#xff08;哈希函数&#xff09;来记录与识别某个数据是否在一个集合中。如果数据不在集合中…

计算机毕业设计 基于Java的国产动漫网站的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

polar CTF 写shell

一、题目 <?php /*PolarD&N CTF*/highlight_file(__FILE__);file_put_contents($_GET[filename],"<?php exit();".$_POST[content]);?>二、解题 payload ?filenamephp://filter/convert.base64-decode/resourceshell.php #<?eval($_POST[1]);…

Hadoop 实战 | 词频统计WordCount

词频统计 通过分析大量文本数据中的词频&#xff0c;可以识别常见词汇和短语&#xff0c;从而抽取文本的关键信息和概要&#xff0c;有助于识别文本中频繁出现的关键词&#xff0c;这对于理解文本内容和主题非常关键。同时&#xff0c;通过分析词在文本中的相对频率&#xff0…

Echarts折线图中数据根据正负数显示不同区域背景色-配置

示例 Echarts折线图中数据根据正负数显示不同区域背景色 Piecewise 分段类型Continuous 连续类型 Echarts配置 option {backgroundColor: "#030A41",xAxis: {type: category,data: [Mon, Tue, Wed, Thu, Fri, Sat, Sun],axisTick: {show: false,},axisLabel: { /…