springboot整合Kafka的快速使用教程

       

目录

一、引入Kafka的依赖

二、配置Kafka

三、创建主题

1、自动创建(不推荐)

2、手动动创建

四、生产者代码

五、消费者代码  

六、常用的KafKa的命令


        Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一,其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka,实现高效的数据处理和消息传递。

一、引入Kafka的依赖

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

二、配置Kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。concurrency: 3  #设置消费者的并发数为3missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

注:kafka的acks有三个值,可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性,来选择对应的值。

  • acks=0:这是最不可靠的模式。当设置为acks=0时,生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下,生产者可以迅速继续发送下一批消息,效率最高,但风险也最大。如果在此模式下发生网络问题或broker故障,发送的消息可能会永久丢失,生产者无法得知消息是否成功到达Kafka broker。因此,这种配置适合于能够容忍少量数据丢失的场景,例如实时数据分析或生成非关键的实时报表。
  • acks=1:这是默认的配置模式,也是一种折衷方案。在这种模式下,生产者会等待分区的领导者节点(leader)确认消息已经成功写入磁盘,才会发送确认信息给生产者。这提高了数据的安全性,因为只要领导者节点保存了消息,即使跟随者(replicas)没有及时同步,消息也不会丢失。然而,如果领导者在同步给所有追随者之前崩溃,那么尚未同步的副本将无法获取该消息,仍然存在消息丢失的风险。
  • acks=all或-1:这是最可靠的模式。在这个模式下,生产者不仅需要领导者节点确认,还会等待所有同步副本(In-sync replicas, ISR)都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证,确保了即使在多个节点故障的情况下,消息也不会丢失。此模式适用于数据可靠性要求非常高的场景,如金融交易系统或重要的日志记录

三、创建主题

    1、自动创建(不推荐)

         不存在的主题,会自动创建,分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。

在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
num.partitions=3 #默认3个分区
auto.create.topics.enable=true #开启自动创建主题
default.replication.factor=3 #默认3个副本

    2、手动动创建

         在kafka的安装目录bin目录下,执行如下命令: 

//创建一个有三个分区和三个副本,名为zhuoye的主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye 

四、生产者代码

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "zhuoye";@Overridepublic void queryECSMetricInfo() {//发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//获取上次查询时间Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {//查询出所有的运行中的实例List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定义计数器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍历查询for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {//获取内网流出带宽,并将结果封装到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("获取ECS的指标数据-多线程处理任务异常!", e);} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;log.error("获取ECS的指标数据失败", e);}//更新记录上次查询时间if (flag) {QueryTimeRecord queryTimeRecord = new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 

五、消费者代码  

@Slf4j
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//处理数据,存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");log.error(e.getMessage());}}
}

六、常用的KafKa的命令

//创建主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收对应的消息
 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分区数
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用户组消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/17348.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十一)- 微服务(1)

微服务 1.认识微服务 SpringCloud底层是依赖于SpringBoot的&#xff0c;并且有版本的兼容关系&#xff0c;如下&#xff1a; 2. 服务拆分 需求 &#xff1a; 把订单信息和用户信息一起返回 从订单模块向用户模块发起远程调用 &#xff0c; 把查到的结果一起返回 步骤 &…

多态(难的起飞)

注意 virtual关键字&#xff1a; 1、可以修饰原函数&#xff0c;为了完成虚函数的重写&#xff0c;满足多态的条件之一 2、可以菱形继承中&#xff0c;去完成虚继承&#xff0c;解决数据冗余和二义性 两个地方使用了同一个关键字&#xff0c;但是它们互相一点关系都没有 虚函…

JAVASE总结一

1、 2、引用也可以是成员变量&#xff08;实例变量&#xff09;&#xff0c;也可以是局部变量&#xff1b;引用数据类型&#xff0c;引用&#xff0c; 我们是通过引用去访问JVM堆内存当中的java对象&#xff0c;引用保存了java对象的内存地址&#xff0c;指向了JVM堆内存当中…

ESP32 - Micropython ESP-IDF 双线教程 脉宽调制(PWM)(1)

ESP32 - Micropython ESP-IDF 双线教程 脉宽调制&#xff08;PWM&#xff09; PWM 的基本原理PWM 的应用PWM 的优点PWM 的实现方式ESP32-micropython 中的 PWM 功能使用 micropython 控制 PWM 的代码示例代码介绍 ESP32-IDF 中的 PWM 功能1. 初始化配置函数2. 引脚绑定函数3. 占…

常见算法200个(5):快速排序(快排)

JS实现快速排序 1.快速排序思路&#xff1a; 选择数组中的一个值作为基准&#xff0c;将数组中小于该值的数置于该数之前&#xff0c;大于该值的数置于该数之后&#xff0c;接着对该数前后的两个数组进行重复操作直至排序完成。 2.代码实现&#xff1a; function quick(arr)…

使用 Snort 进行入侵检测

使用 Snort 进行入侵检测 Snort 是一种流行的开源入侵检测系统。您可以在http://www.snort.org/上获取它。Snort 分析流量并尝试检测和记录可疑活动。Snort 还能够根据其所做的分析发送警报。 Snort 安装 在本课中&#xff0c;我们将从源代码安装。此外&#xff0c;我们不会安…

2024 前端面试每日1小时

三日 1. 如何理解Vue的模板编译原理 Vue的模板编译实际就是将模板字符串通过解析、优化和代码生成等步骤转换为渲染函数的过程。这个过程中&#xff0c;AST扮演了非常重要的角色&#xff0c;它用树形结构描述了模板的内容和结构&#xff0c;是编译过程的核心数据结构&#xff…

MySQL——适合不适合创建索引的情况

那些情况适合创建索引 1、字段的数值具有唯一性的限制 索引本身可以起到约束的作用&#xff0c;比如唯一索引、主键索引都是可以起到唯一性约束的&#xff0c;因此在我们的数据表中&#xff0c;如果某个字段是唯一性的&#xff0c;就可以直接创建唯一性索引&#xff0c;或者主…

Nodejs 爬虫 案例

1.安装&#xff1a; npm install cheerio npm install axios2.介绍&#xff1a; 2.1 cheerio 特点和用途描述&#xff1a; HTML解析和操作&#xff1a;Cheerio 可以将 HTML 字符串加载到内存中&#xff0c;并将其转换为一个可操作的 DOM 树结构&#xff0c;从而可以方便地对…

AURIX TC3xx单片机介绍-启动过程介绍1

从各个域控制器硬件解决方案来看,MPU可能来自多个供应商,有瑞萨,有NXP等,但对于MCU来说,基本都采用英飞凌TC3xx。 今天我们就来看一下TC3xx的启动过程,主要包含如下内容: uC上电过程中,会经过一个上电时序,从复位状态“脱离”出来;Boot Firmware是复位后第一个执行的…

使用 Effect 同步-09

有些组件需要与外部系统同步。例如&#xff0c;你可能希望根据 React state 控制非 React 组件、设置服务器连接或在组件出现在屏幕上时发送分析日志。Effects 会在渲染后运行一些代码&#xff0c;以便可以将组件与 React 之外的某些系统同步。 简单理解&#xff0c;就是需要操…

Python实现对Word文档内容出现“重复标题”进行自动去重(4)

前言 本文是该专栏的第4篇,后面会持续分享Python办公自动化干货知识,记得关注。 在本专栏上一篇文章《Python实现对Word文档内容出现“重复标题”进行自动去重(3)》中,笔者有详细介绍使用python对word文档内容的目标文本进行自动去重。只不过本文要介绍的“去重方法”与上…

计算机专业必考之计算机指令设计格式

计算机指令设计格式 例题&#xff1a; 1.设相对寻址的转移指令占3个字节&#xff0c;第一字节为操作码&#xff0c;第二&#xff0c;第三字节为相对偏移量&#xff0c; 数据在存储器以低地址为字地址的存放方式。 每当CPU从存储器取出一个字节时候&#xff0c;自动完成&…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-24.1,2 SPI驱动实验-SPI协议介绍

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

计算机组成原理易混淆知识点总结(持续更新)

目录 1.机器字长&#xff0c;存储字长与指令字长 2.指令周期,机器周期,时钟周期 3.CPI,IPS,MIPS 4.翻译程序和汇编程序 5.计算机体系结构和计算机组成的区别和联系 6.基准程序执行得越快说明机器的性能越好吗? 1.机器字长&#xff0c;存储字长与指令字长 不同的机器三者…

AI智能体|扣子Coze文生图功能接入微信公众号

大家好&#xff0c;我是无界生长。 AI智能体&#xff5c;扣子Coze文生图功能接入微信公众号本文分享了如何将Coze平台的文生图功能接入微信公众号的详细操作流程&#xff0c;包括创建图像流、创建并配置Bot、设置提示词和开场白、调试、发布等步骤。如果看完还没学会的话&…

网页图片加载慢的求解指南

网页/图片加载慢的求解指南 一、前言与问题描述 今天刚换上华为的HUAWEI AX3 Pro New&#xff0c;连上WIFI后测速虽然比平时慢&#xff0c;但是也不算太离谱&#xff0c;如下图所示&#xff1a; 估计读者们有也和作者一样&#xff0c;还没意识到事情的严重性&#x1f601;。 …

08Django项目--用户管理系统--查(前后端)

对应视频链接点击直达 TOC 一些朋友加我Q反馈&#xff0c;希望有每个阶段的完整项目代码&#xff0c;那从今天开始&#xff0c;我会上传完整的项目代码。 用户管理&#xff0c;简而言之就是用户的增删改查。 08项目点击下载&#xff0c;可直接运行&#xff08;含数据库&…

PHP框架 Laravel

现在因为公司需求&#xff0c;需要新开一个Laravel框架的项目&#xff0c;毫无疑问&#xff0c;我又被借调过去了&#xff0c;最近老是被借调&#xff0c;有点阴郁&#xff0c;不过反观来看&#xff0c;这也是好事&#xff0c;又可以复习和巩固一下自己的知识点&#xff0c;接下…

大数据开发面试题【Spark篇】

115、Spark的任务执行流程 driver和executor&#xff0c;结构式一主多从模式&#xff0c; driver&#xff1a;spark的驱动节点&#xff0c;用于执行spark任务中的main方法&#xff0c;负责实际代码的执行工作&#xff1b;主要负责&#xff1a;将代码逻辑转换为任务、在executo…