Kafka是什么,以及如何使用SpringBoot对接Kafka

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

在这里插入图片描述

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test_topic", message);}
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaLis {@KafkaListener(topics = "test_topic", groupId = "test_group")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate KafkaService kafkaService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaService.sendMessage(message);return "Message sent successfully";}
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS进阶-原型

原型 原型就是一个对象&#xff0c;也称为原型对象 构造函数通过原型分配的函数是所有对象所共享的 JavaScript规定&#xff0c;每一个构造函数都有一个prototype属性&#xff0c;指向另一个对象&#xff0c;所以我们也称为原型对象 这个对象可以挂载函数&#xff0c;对象实…

不标准的 json 格式的字符串如何转为标准的(json字符串属性名不带双引号如何转

背景 不规范的 json 字符串例如 属性名不带双引号 {name:"abc"}属性名带单引号而不是双引号 {name:"abc"}属性值该用双引号的时候用了单引号 {"name":abc}还有一种情况就是以上情况的混合 所谓规范的json字串就是属性名要用双引号&#xf…

【Linux】信号屏蔽与信号捕捉的原理与实现(附图解与代码)

这一篇的篇幅可能有点长&#xff0c;如果已经了解了以下两个知识点的同学可以自行跳到第三部分——信号屏蔽的实现。 不太了解的同学希望你们能够静下心来看完&#xff0c;相信一定会有不小的收获。那么话不多说&#xff0c;我们这就开始啦&#xff01;&#xff01;&#xff0…

代码随想录算法训练营第四十六天 | 518. 零钱兑换 II、377. 组合总和 Ⅳ

518. 零钱兑换 II 视频讲解&#xff1a;动态规划之完全背包&#xff0c;装满背包有多少种方法&#xff1f;组合与排列有讲究&#xff01;| LeetCode&#xff1a;518.零钱兑换II_哔哩哔哩_bilibili 代码随想录 &#xff08;1&#xff09;代码 377. 组合总和 Ⅳ 视频讲解&…

JOSEF约瑟 闭锁继电器 LB-7 YDB-100 100V 50HZ 控制断路器的合闸或跳闸

闭锁继电器LB-7导轨安装名称:闭锁继电器型号:LB-7闭锁继电器额定电压100V功率消耗≤10VA触点容量220V1.5A40W返回系数≥0.8 LB-1A、LB-1D、DB-1、HBYB-102/D YDB-100、HLO、DB-100、LB-7型闭锁继电器 一、用途 LB-7型闭锁继电器(以下简称继电器)用于发电厂及变电所内高压母线…

Electron笔记

基础环境搭建 官网:https://www.electronjs.org/zh/ 这一套笔记根据这套视频而写的 创建项目 方式一: 官网点击GitHub往下拉找到快速入门就能看到下面这几个命令了 git clone https://github.com/electron/electron-quick-start //克隆项目 cd electron-quick-start //…

提取歌曲伴奏?用对软件一键帮你搞定~

相信大家经常想获取某首歌曲的伴奏&#xff0c;但是不知从何下手&#xff0c;今天这篇教程给大家分享一个超神奇软件&#xff0c;一键提取歌曲伴奏&#xff01; 第一步&#xff1a;打开【音分轨】APP&#xff0c;进入首页点击【人声分离】 第二步&#xff1a;选择导入方式&…

SpringBoot 中使用JPA

最近忙里偷闲&#xff0c;想写一点关于JPA的东西&#xff0c;另外也加深下对JPA的理解&#xff0c;才有了此篇博文。 一、JPA JPA &#xff08;Java Persistence API&#xff09;Java持久化API&#xff0c;是一套Sun公司Java官方制定的ORM 规范&#xff08;sun公司并没有实现…

为什么mac上有的软件删除不掉?

对于Mac用户来说&#xff0c;软件卸载通常是一个相对简单的过程。然而&#xff0c;有时你可能会发现某些软件似乎“顽固不化”&#xff0c;即使按照常规方式尝试卸载&#xff0c;也依然存在于你的电脑上。这到底是为什么呢&#xff1f;本文将探讨这一问题的可能原因。 1.卸载失…

C#制做一个 winform下的表情选择窗口

能力有限&#xff0c;别人可能都是通过其他方式实现的&#xff0c;我这里简单粗暴一些&#xff0c;直接通过点击按钮后弹出个新窗体来实现。 1、先在form1上增加一个toolstrip控件&#xff0c;再增加个toolstripbutton按钮&#xff0c;用来点击后弹出新窗体&#xff0c;如图&a…

智能井盖传感器:城市安全卫士

随着城市人口的不断增加和城市基础设施的不断发展&#xff0c;井盖作为城市道路和排水系统的重要组成部分&#xff0c;承担着确保城市安全和便利性的关键角色。然而&#xff0c;井盖在日常使用中常常面临倾斜、水浸和翻转等问题&#xff0c;这些问题可能导致交通阻塞、行人坠井…

小谈设计模式(20)—组合模式

小谈设计模式&#xff08;20&#xff09;—组合模式 专栏介绍专栏地址专栏介绍 组合模式对象类型叶节点组合节点 核心思想应用场景123 结构图结构图分析 Java语言实现首先&#xff0c;我们需要定义一个抽象的组件类 Component&#xff0c;它包含了组合节点和叶节点的公共操作&a…

Windows配置ADB工具

一、目的 在进行嵌入式开发时&#xff0c;我们经常使用ADB工具登录到开发板上进行命令操作&#xff0c;本篇我们介绍如何在windows平台配置ADB环境。 二、实战 1.下载adb工具包​​​​​​​https://developer.android.com/studio/releases/platform-tools?hlzh-cnhttps://d…

任务工单发送失败重试方案设计

需求背景&#xff1a; 该系统为一个工单系统&#xff0c;其中任务工单为该系统中的一个模块&#xff1b;任务工单它是需要周期性调度的一种任务类型&#xff1b;可以按照用户配置的时间周期定时性触发的。由于任务需要发送到对应的工作人员上&#xff0c;所以这里需要先对员工进…

DM宣传单制作,利用在线模板,快速替换文字

如果你需要制作一批宣传单&#xff0c;但是时间很紧&#xff0c;而且没有专业的设计人员协助&#xff0c;那么你可以选择使用在线模板来快速制作宣传单。本文将介绍如何使用乔拓云平台&#xff0c;快速制作宣传单的方法。 步骤一&#xff1a;选择适合的在线制作工具 首先&…

Leetcode hot 100之前缀和、差分数组、位运算

目录 差分数组-区间增减 和为K的子数组&#xff1a;前缀和 哈希表优化 除自身以外数组的乘积&#xff1a;前后缀区间 位运算 异或&#xff1a;同为0&#xff0c;不同为1 136. 只出现一次的数字&#xff1a;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现2次…

【Unity3D编辑器开发】Unity3D编辑器开发基础性框架结构【全面总结】

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 嗨&#xff0c;大家好&#xff0c;我是恬静的小魔龙。 同学们…

Spring MVC 中的国际化和本地化

Spring MVC 中的国际化和本地化 国际化&#xff08;Internationalization&#xff0c;简称i18n&#xff09;和本地化&#xff08;Localization&#xff0c;简称l10n&#xff09;是构建多语言应用程序的重要概念。Spring MVC提供了丰富的支持&#xff0c;使开发人员能够轻松地处…

Spark基础

一、spark基础 1、为什么使用Spark Ⅰ、MapReduce编程模型的局限性 (1) 繁杂 只有Map和Reduce两个操作&#xff0c;复杂的逻辑需要大量的样板代码 (2) 处理效率低 Map中间结果写磁盘&#xff0c;Reduce写HDFS&#xff0c;多个Map通过HDFS交换数据 任务调度与启动开销大 (…

前后端通信到底是怎样一个过程

前后端通信是怎样 前言&#xff1a;Http协议 超文本传输协议 规定&#xff1a;每一次前后端通信&#xff0c;前端需要主动向后端发出请求&#xff0c;后端接收到前端的请求后&#xff0c;可以给出响应 1、Http报文 浏览器向服务器发送请求时&#xff0c;请求本身就是信息&…