Kafka是什么,以及如何使用SpringBoot对接Kafka

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

在这里插入图片描述

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test_topic", message);}
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaLis {@KafkaListener(topics = "test_topic", groupId = "test_group")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate KafkaService kafkaService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaService.sendMessage(message);return "Message sent successfully";}
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图论---最小生成树问题

在连通网的所有生成树中&#xff0c;所有边的代价和最小的生成树&#xff0c;称为最小生成树。解决最小生成树问题一般有两种算法&#xff1a;Kruskal算法和Prim算法。 Kruskal算法 原理&#xff1a;基本思想是从小到大加入边&#xff0c;是个贪心算法。我们将图中的每个边按…

JS进阶-原型

原型 原型就是一个对象&#xff0c;也称为原型对象 构造函数通过原型分配的函数是所有对象所共享的 JavaScript规定&#xff0c;每一个构造函数都有一个prototype属性&#xff0c;指向另一个对象&#xff0c;所以我们也称为原型对象 这个对象可以挂载函数&#xff0c;对象实…

不标准的 json 格式的字符串如何转为标准的(json字符串属性名不带双引号如何转

背景 不规范的 json 字符串例如 属性名不带双引号 {name:"abc"}属性名带单引号而不是双引号 {name:"abc"}属性值该用双引号的时候用了单引号 {"name":abc}还有一种情况就是以上情况的混合 所谓规范的json字串就是属性名要用双引号&#xf…

(二)Apache log4net™ 手册 - 配置

0、引言 在上一篇文章中我们简单介绍了 Log4Net 及其核心的三大组件。本文将在上一篇文章的基础上继续探讨与 Log4Net 配置相关的内容。 1、配置 将日志请求插入到应用程序代码中需要进行大量的计划和工作。观察表明&#xff0c;大约4%的代码专门用于日志记录。因此&#xf…

【Linux】信号屏蔽与信号捕捉的原理与实现(附图解与代码)

这一篇的篇幅可能有点长&#xff0c;如果已经了解了以下两个知识点的同学可以自行跳到第三部分——信号屏蔽的实现。 不太了解的同学希望你们能够静下心来看完&#xff0c;相信一定会有不小的收获。那么话不多说&#xff0c;我们这就开始啦&#xff01;&#xff01;&#xff0…

Linux网络存储:NFS

NSF 笔记&#xff1a; NFS是通过网络来进行服务器和客户端之间的数据传输的&#xff0c;我们大家都知道&#xff0c;要想通过网络进行传输&#xff0c;必须得知道是通过哪一个端口进行传输的&#xff01; NFS服务器对于端口的选择是随机的&#xff0c;那么问题在于&#xff0…

代码随想录算法训练营第四十六天 | 518. 零钱兑换 II、377. 组合总和 Ⅳ

518. 零钱兑换 II 视频讲解&#xff1a;动态规划之完全背包&#xff0c;装满背包有多少种方法&#xff1f;组合与排列有讲究&#xff01;| LeetCode&#xff1a;518.零钱兑换II_哔哩哔哩_bilibili 代码随想录 &#xff08;1&#xff09;代码 377. 组合总和 Ⅳ 视频讲解&…

JOSEF约瑟 闭锁继电器 LB-7 YDB-100 100V 50HZ 控制断路器的合闸或跳闸

闭锁继电器LB-7导轨安装名称:闭锁继电器型号:LB-7闭锁继电器额定电压100V功率消耗≤10VA触点容量220V1.5A40W返回系数≥0.8 LB-1A、LB-1D、DB-1、HBYB-102/D YDB-100、HLO、DB-100、LB-7型闭锁继电器 一、用途 LB-7型闭锁继电器(以下简称继电器)用于发电厂及变电所内高压母线…

MySQL报错:Row size too large (> 8126)

问题描述 1118 - Row size too large ( 8126). Changing some columns to TEXT or BLOB or using ROW_FORMATDYNAMIC or ROW_FORMATCOMPRESSED may help. ln current row format, BLOB prefix of 768 bytes is stored inline. 问题分析 InnoDB引擎建表时&#xff0c;如果最大行…

git stash

git stash 是 Git 中一个非常有用的命令&#xff0c;用于临时保存当前工作目录中的修改&#xff0c;以便你可以切换到其他分支或处理其他任务而不丢失你的修改。它的主要用途是&#xff1a; 保存未提交的修改&#xff1a;你可以使用 git stash 命令将未提交的修改&#xff08;包…

【计算机网络】网络编程接口 Socket API 解读(9)

Socket 是网络协议栈暴露给编程人员的 API&#xff0c;相比复杂的计算机网络协议&#xff0c;API 对关键操作和配置数据进行了抽象&#xff0c;简化了程序编程。 本文讲述的 socket 内容源自 Linux man。本文主要对各 API 进行详细介绍&#xff0c;从而更好的理解 socket 编程。…

Electron笔记

基础环境搭建 官网:https://www.electronjs.org/zh/ 这一套笔记根据这套视频而写的 创建项目 方式一: 官网点击GitHub往下拉找到快速入门就能看到下面这几个命令了 git clone https://github.com/electron/electron-quick-start //克隆项目 cd electron-quick-start //…

提取歌曲伴奏?用对软件一键帮你搞定~

相信大家经常想获取某首歌曲的伴奏&#xff0c;但是不知从何下手&#xff0c;今天这篇教程给大家分享一个超神奇软件&#xff0c;一键提取歌曲伴奏&#xff01; 第一步&#xff1a;打开【音分轨】APP&#xff0c;进入首页点击【人声分离】 第二步&#xff1a;选择导入方式&…

Java小技能:利用反射获取整个项目的枚举字典

文章目录 前言I JAVA反射获取所有枚举类1.1 遍历特定目录1.2 todosee also流水线docker 部署前言 需求:提供一个接口返回项目的所有枚举,提供给前端使用。 I JAVA反射获取所有枚举类 1.1 遍历特定目录 遍历特定目录的枚举,返回给前端使用 @GetMapping("findAnyEnum…

SpringBoot 中使用JPA

最近忙里偷闲&#xff0c;想写一点关于JPA的东西&#xff0c;另外也加深下对JPA的理解&#xff0c;才有了此篇博文。 一、JPA JPA &#xff08;Java Persistence API&#xff09;Java持久化API&#xff0c;是一套Sun公司Java官方制定的ORM 规范&#xff08;sun公司并没有实现…

建议收藏!混迹职场多年总结出的8大技巧!

1. 不要吃“哑巴”亏&#xff1a;不管在什么企业&#xff0c;一定要“会说话”&#xff0c;敢于表达自己&#xff0c;但是又兼顾身边人的感受&#xff0c;考虑好自己的言行将会带来的后果。良好的沟通技巧对于在职场中建立良好的人际关系和解决问题至关重要。学会倾听、表达和理…

C/C++ 排序算法总结

1.冒泡排序 https://blog.csdn.net/weixin_49303682/article/details/119365319 1 #include <stdio.h>2 3 #define N 94 5 void print(int a[])6 {7 for(int i 0; i < N; i)8 {9 printf("%d ", a[i]); 10 } 11 printf("…

为什么mac上有的软件删除不掉?

对于Mac用户来说&#xff0c;软件卸载通常是一个相对简单的过程。然而&#xff0c;有时你可能会发现某些软件似乎“顽固不化”&#xff0c;即使按照常规方式尝试卸载&#xff0c;也依然存在于你的电脑上。这到底是为什么呢&#xff1f;本文将探讨这一问题的可能原因。 1.卸载失…

C#制做一个 winform下的表情选择窗口

能力有限&#xff0c;别人可能都是通过其他方式实现的&#xff0c;我这里简单粗暴一些&#xff0c;直接通过点击按钮后弹出个新窗体来实现。 1、先在form1上增加一个toolstrip控件&#xff0c;再增加个toolstripbutton按钮&#xff0c;用来点击后弹出新窗体&#xff0c;如图&a…

【uniapp】小程序开发5:获取openid、获取手机号

一、获取小程序openid 需要配合后端接口获取授权码&#xff08;code&#xff09; 1&#xff09;调用uni.login()方法获取授权code&#xff0c;并把code传给getOpenid //使用uni.login的时候可以在任何接口下使用即可&#xff0c;主要看打印出来的code值和openid let that t…