使用SpringBoot对接Kafka

Kafka是什么,以及如何使用SpringBoot对接Kafka

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test_topic", message);}
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。

5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaLis {@KafkaListener(topics = "test_topic", groupId = "test_group")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate KafkaService kafkaService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaService.sendMessage(message);return "Message sent successfully";}
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

三、启动与验证

首先自然是启动 Kafka ,然后是启动我们的Spring Boot项目

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

最后检查我们的项目日志:

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

        producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
        defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
        messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

        send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
        send(String topic, V data):向指定的 Kafka 主题发送一条消息。
        send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
        execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
        inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass ConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结
        今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/29088.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS是什么

JS 是 JavaScript 的缩写&#xff0c;它是一种轻量级的、解释型的或即时编译型的编程语言。JavaScript 主要被用于在网页上实现动态的、交互式的功能。它可以直接嵌入到HTML网页中&#xff0c;也可以通过外部文件来链接。 JavaScript 有以下主要特性和用途&#xff1a; 动态交…

JAVA动态表达式:反向解析表达式

接上面&#xff1a; JAVA动态表达式&#xff1a;Antlr4 G4 模板 读取字符串表达式结构树-CSDN博客 JAVA动态表达式&#xff1a;Antlr4 表达式树解析-CSDN博客 JAVA动态表达式&#xff1a;Antlr4 G4模板-CSDN博客 上面的内容是实现了表达式转行成类。 如&#xff1a;proc…

【保姆级】Linux 基于 Docker 部署 ES7.7.0 elasticsearch7.7.0

一、拉取 ES 镜像 docker pull elasticsearch:7.7.0二、创建挂载目录并授权 # 挂载目录 mkdir ~/elasticsearch mkdir ~/elasticsearch/config mkdir ~/elasticsearch/data mkdir ~/elasticsearch/plugins# 赋予权限 sudo chmod -R 777 &#xff5e;/elasticsearch/ sudo chm…

组件二次封装,通过属性事件透传,插槽使用,组件实例方法的绑定,深入理解 Vue.js 组件扩展与插槽

透传&#xff0c;插槽&#xff0c;组件实例方法的绑定&#xff0c;深入理解 Vue.js 组件扩展与插槽 前言 Vue.js 提供了强大的组件化系统&#xff0c;允许开发者构建可复用、可组合的UI组件。在实际项目中&#xff0c;直接使用第三方库提供的基础组件&#xff08;如Element UI…

Internet Download Manager(IDM6.41)软件下载-详细安装教程视频

Internet Download Manager有一个智能下载逻辑加速器&#xff0c;具有智能动态文件分割和安全的多部分下载技术&#xff0c;可以加速下载。与其他下载加速器和管理器不同&#xff0c;Internet下载管理器在下载开始之前对文件进行分段&#xff0c;而Internet下载管理器在下载过程…

用TensorRT-LLM进行LLama的推理和部署

Deploy an AI Coding Assistant with NVIDIA TensorRT-LLM and NVIDIA Triton | NVIDIA Technical BlogQuick Start Guide — tensorrt_llm documentation (nvidia.github.io) 使用TensorRT-LLM的源码&#xff0c;来下载docker并在docker里编译TensorRT-LLM&#xff1b; 模型…

模拟14位相机输出输入到bram Verilog代码

1 模拟输出代码 `timescale 1ns / 1psmodule simulate_camera_out (input clk,input rest_n,output camera_clk, //像素时钟output [13:0] camera_data, //像素值数据output [19:0] pixel_xy, //此时输出的像素值坐标output reg frame_valid //帧有效信号,1代表帧有效0代…

细说MCU定时器中断的实现方法

目录 一、硬件及工程 二、STM32G4系列MCU的定时器 三、定时器中断的实现过程 1、配置新工程.ioc 2、代码修改 &#xff08;1&#xff09;时钟初始化函数MX_TIM3_Init() &#xff08;2&#xff09;使能定时器中断 &#xff08;3&#xff09;定时器中断服务函数 &#…

从混乱到秩序:数据提取与治理的重要性

&#x1f525;从混乱到秩序&#xff1a;数据提取与治理的重要性&#x1f525; &#x1f4ca; 大家好&#xff0c;今天我们来聊聊一个在我们生活中无处不在&#xff0c;但又经常被忽视的话题——#数据提取与治理#。 &#x1f4ad; 想象一下&#xff0c;你走进一个满是杂物的房…

Android Calculator2源码分析与修改

private CalculatorDisplay mDisplay; private Symbols mSymbols new Symbols(); -41,6 44,7 class Logic { private int mLineLength 0; private static final String INFINITY_UNICODE “\u221e”; private static final String ZMS_NUMBER “55555”; public stat…

Linux构建本地时间同步ntp

环境介绍&#xff1a; 主机名 IP地址 系统发行版 环境 Node01 192.168.100.102 Centos 7.4 可联网、已关闭防火墙selinux Node02 192.168.100.103 Centos 7.4 已关闭防火墙selinux 1.主节点同步阿里云标准时间 在保证连接外网的情况下&#xff0c;同步阿里服务器的…

C#面:构造函数是否能被重写?

构造函数不能被重写。 构造函数是用于创建对象时初始化对象的特殊方法&#xff0c;它的名称与类名相同&#xff0c;并且没有返回类型。在派生类中&#xff0c;可以使用基类的构造函数来初始化继承的成员&#xff0c;但不能重写基类的构造函数。派生类可以定义自己的构造函数来…

安卓gradel下载失败解决方案

安卓gradel下载失败解决方案 直接下载下来放到指定目录下 直接下载下来放到指定目录下 目录&#xff1a;C:\Users\Administrator.gradle\wrapper\dists&#xff08;可以直接在C盘搜索.gradle文件夹&#xff09; 找到后&#xff0c;将gradle*.zip放到与.zip.lck和.zip.ok同级的…

nodejs日志:morgan和winston篇

文章目录 介绍一下morgan和winston的区别morgan功能优势&#xff1a; winston功能优势 选择 代码示例 介绍一下morgan和winston的区别 morgan 功能 morgan 是一个 HTTP 请求日志中间件&#xff0c;专门用于记录 Express 应用的 HTTP 请求日志。它简单易用&#xff0c;提供多…

Spring的SmartLifecycle可以没用过,但没听过就不好了! - 第517篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…

three.js开发3D地图记录(一)

关键代码部分&#xff1a; <template><div class"center-map-box" id"contant"></div> </template><script> import * as THREE from "three"; import { OrbitControls } from "three/examples/jsm/control…

springboot小型超市商品展销系统-计算机毕业设计源码01635

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

EIQ-ABC 分析法在配送中心储位分配中的应用

配送中心运作效率的高低主要取决于仓储业务流程的作业效率&#xff0c;在配送作业流程中&#xff0c;储位分配的是否合理性成为影响配送运作效率的重要因素。为实现储位的合理分配&#xff0c;提出通过对订单信息的分析&#xff0c;并应用 EIQ-ABC 分析法&#xff0c;以此实现缩…

白酒:茅台镇白酒的品牌合作与跨界营销案例

云仓酒庄豪迈白酒&#xff0c;作为茅台镇的知名品牌&#xff0c;在品牌合作与跨界营销方面也有着杰出的表现。通过与不同领域品牌的合作&#xff0c;豪迈白酒进一步拓宽了市场渠道&#xff0c;提升了品牌曝光度和影响力。 首先&#xff0c;云仓酒庄豪迈白酒与品质餐产品牌的合作…

解析文字示例

import pandas as pdtext f""" “时筱怎么也没想到&#xff0c;一觉睡醒&#xff0c;她竟然回到末世前&#xff0c;回到一切都还没有发生的时候&#xff01;“上辈子&#xff0c;父母在末世一开始就没了&#xff0c;本以为相互扶持的未婚夫早就和表妹搞在一起&…