Apache Kafka与Spring整合应用详解

引言

Apache Kafka是一种高吞吐量的分布式消息系统,广泛应用于实时数据处理、日志聚合和事件驱动架构中。Spring作为Java开发的主流框架,通过Spring Kafka项目提供了对Kafka的集成支持。本文将深入探讨如何使用Spring Kafka整合Apache Kafka,并通过详细的代码示例帮助新人理解和掌握这一技术。

环境准备

在开始之前,请确保你已经安装并配置好了以下环境:

  1. Apache Kafka集群
  2. Java JDK 8或更高版本
  3. Maven或Gradle构建工具
  4. Spring Boot 2.3.0或更高版本

项目依赖配置

首先,我们需要在pom.xml中添加Spring Kafka的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

Kafka配置

在Spring Boot应用中,我们需要在application.properties中配置Kafka的相关信息。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

生产者配置与实现

生产者用于将消息发送到Kafka主题中。我们首先定义一个配置类来配置Kafka生产者。

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

接着,我们创建一个生产者服务类,用于发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC = "my_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

消费者配置与实现

消费者用于从Kafka主题中读取消息。我们也需要定义一个配置类来配置Kafka消费者。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

接着,我们创建一个消费者服务类,用于接收消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "my_topic", groupId = "my-group")public void consume(String message) {System.out.println("Consumed message: " + message);}
}

控制器实现

为了测试我们的Kafka生产者和消费者,我们可以创建一个简单的Spring Boot控制器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService producerService;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {producerService.sendMessage(message);return "Message sent to Kafka topic: " + message;}
}

运行应用

启动Spring Boot应用,打开浏览器,访问http://localhost:8080/send?message=HelloKafka。你应该会看到控制台输出:

Consumed message: HelloKafka

总结

本文详细介绍了如何使用Spring Kafka整合Apache Kafka,包括项目依赖配置、Kafka配置、生产者与消费者的实现以及简单的测试控制器。通过这些示例代码,新人可以快速上手,并且深入理解Spring与Kafka的集成方式。希望本文对你有所帮助,祝你在Java开发的路上越来越顺利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/856055.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

8.每日LeetCode-笔试题,交替打印数字和字母

问题描述 ​​​交替打印数字和字母 使用两个 goroutine 交替打印序列&#xff0c;一个 goroutine 打印数字&#xff0c; 另外一个 goroutine 打印字母&#xff0c; 最终效果如下&#xff1a; 12AB34CD56EF78GH910IJ1112KL1314MN1516OP1718QR1920ST2122UV2324WX2526YZ2728 解题…

Apple - Authorization Services Programming Guide

本文翻译整理自&#xff1a;Authorization Services Programming Guide&#xff08;更新日期&#xff1a;2011-10-19 https://developer.apple.com/library/archive/documentation/Security/Conceptual/authorization_concepts/01introduction/introduction.html#//apple_ref/d…

JavaScript基础部分知识点总结(Part3)

函数的概念 1. 函数的概念 在JS 里面&#xff0c;可能会定义非常多的相同代码或者功能相似的代码&#xff0c;这些代码可能需要大量重复使用。虽然for循环语句也能实现一些简单的重复操作&#xff0c;但是比较具有局限性&#xff0c;此时我们就可以使用JS 中的函数。函数&…

.net 框架基础

总目录 C# 语法总目录 .net 框架基础 字符、字符串1.字符1.1 字段1.2 静态方法 2. 字符串2.1 null和空字符串2.2 访问字符串中的字符2.3 搜索字符串内的字段2.4 字符串的修改方法2.5 字符串格式初体验2.6 字符串的顺序比较 3. StringBuilder类 日期和时间1. TimeSpan2. DateTim…

leetcode189 轮转数组

题目 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 示例 输入: nums [1,2,3,4,5,6,7], k 3 输出: [5,6,7,1,2,3,4] 解释: 向右轮转 1 步: [7,1,2,3,4,5,6] 向右轮转 2 步: [6,7,1,2,3,4,5] 向右轮转 3 步: [5,6,7,1,2…

Java面试——认证与授权

X、常见面试题汇总 1、Shiro与SpringSecutity对比 1&#xff09;Shiro的特点&#xff1a; Shiro 是 Apache 下的项目&#xff0c;相对简单、轻巧&#xff0c;更容易上手使用。 Shiro 权限功能基本都能满足&#xff0c;单点登录都可以实现。且不用与任何的框架或者容器绑定, 可…

Tensorflow入门实战 T05-运动鞋识别

目录 一、完整代码 二、训练过程 &#xff08;1&#xff09;打印2行10列的数据。 &#xff08;2&#xff09;查看数据集中的一张图片 &#xff08;3&#xff09;训练过程&#xff08;训练50个epoch&#xff09; &#xff08;4&#xff09;训练结果的精确度 三、遇到的问…

安装VSCode创建注册表出错,RegCreateKey错误码5

今天对VSCode进行做更新安装&#xff0c;谁知道安装到最后弹出下面这么个错误 找到windows下管用的一种解决办法&#xff1a; winR打开运行&#xff0c;输入 regedit找到错误提示中的路径&#xff0c;HKEY_CURRENT_USER\Software\Classes\VSCode.class\open (图中的错误注册表…

音乐圈的颠覆与挑战——创意产业在AI时代的生存之道

最近的一个月&#xff0c;音乐界经历了一场前所未有的变革。一系列音乐大模型轮番上线&#xff0c;它们以惊人的速度和准确性模仿并创作音乐&#xff0c;将素人生产音乐的门槛降到了最低。然而&#xff0c;这种技术的崛起也引发了关于音乐圈是否会被AI彻底颠覆的讨论。  这些…

第 三 方 组 件 e l e m e n t - u i[Vue]

一、组件之间的传值 组件可以由内部的Data提供数据&#xff0c;也可以由父组件通过prop的方式传值。 兄弟组件之间可以通过Vuex等统一数据源提供数据共享 第一种 Movie.vue <template><div><h1>我才不要和你做朋友</h1></div></template&…

嵌入式开发十九:SysTick—系统定时器

在前面实验中我们使用到的延时都是通过SysTick进行延时的。 我们知道&#xff0c;延时有两种方式&#xff1a;软件延时&#xff0c;即CPU 循环等待产生的&#xff0c;这个延时是不精确的。第二种就是滴答定时器延时&#xff0c;本篇博客就来介绍 STM32F4 内部 SysTick 系统定时…

高德地图轨迹回放/轨迹播放

前言 本篇文章主要介绍高德地图的轨迹回放或播放的实现过程&#xff0c;是基于vue2实现的功能&#xff0c;同时做一些改动也是能够适配vue3的。其中播放条是用的是element UI中的el-slider组件&#xff0c;包括使用到的图标也是element UI自带的。可以实现轨迹的播放、暂停、停…

【windows|004】BIOS 介绍及不同品牌电脑和服务器进入BIOS设置的方法

&#x1f341;博主简介&#xff1a; &#x1f3c5;云计算领域优质创作者 &#x1f3c5;2022年CSDN新星计划python赛道第一名 &#x1f3c5;2022年CSDN原力计划优质作者 ​ &#x1f3c5;阿里云ACE认证高级工程师 ​ &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社…

【ARM】如何通过Keil MDK查看芯片的硬件信息

【更多软件使用问题请点击亿道电子官方网站】 1、文档目标&#xff1a; 解决在开发过程中对于开发项目所使用的的芯片的参数查看的问题 2、问题场景&#xff1a; 在项目开发过程中&#xff0c;经常需要对于芯片的时钟、寄存器或者一些硬件参数需要进行确认。大多数情况下是需…

wps-文档-js宏-批量修改表格格式

目录 前言开启JS宏我的脚本参考API文档 前言 由于需要修改word的表格的格式&#xff0c;一个一个的修改太慢了&#xff0c;所以需要通过宏的方式来修改&#xff0c;需要注意的是低版本可能没有JS宏… 开启JS宏 切换到工具–>点击开发工具 点击之后功能栏会变化成这样 选…

21、架构-持久化存储

1、Kubernetes存储设计 Kubernetes在存储设计上秉承声明式API和资源抽象的理念&#xff0c;用户通过声明存储需求&#xff0c;Kubernetes负责调度和管理实际的存储资源。以下是Kubernetes存储设计中的核心概念和机制。 Mount和Volume 在Kubernetes中&#xff0c;Volume和Moun…

渗透测试基础(二) Linux+Win常用命令介绍

1. Linux常用命令 1.1 解压缩相关 1.1.1 tar命令 解包&#xff1a;tar zxvf FileName.tar 打包&#xff1a;tar czvf FileName.tar DirName1.1.2 gz命令 对于.gz格式的解压1&#xff1a;gunzip FileName.gz解压2&#xff1a;gzip -d FileName.gz压缩&#xff1a;gzip FileN…

HJ39判断两个IP是否属于同一子网

提示&#xff1a;文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 HJ39判断两个IP是否属于同一子网 一、 代码&#xff1a; 第一版代码没有对掩码网络号进行处理。一开始对非法字段的理解就是value大于255。然后执行示例&#xff0c; 254.255.0.0 85.122.52.249 10.57.…

Dell戴尔灵越Inspiron 16 Plus 7640/7630笔记本电脑原装Windows11下载,恢复出厂开箱状态预装OEM系统

灵越16P-7630系统包: 链接&#xff1a;https://pan.baidu.com/s/1Rve5_PF1VO8kAKnAQwP22g?pwdjyqq 提取码&#xff1a;jyqq 灵越16P-7640系统包: 链接&#xff1a;https://pan.baidu.com/s/1B8LeIEKM8IF1xbpMVjy3qg?pwdy9qj 提取码&#xff1a;y9qj 戴尔原装WIN11系…

如何优雅的一键适配Ubuntu20.04的OpenHarmony环境?请关注【itopen:openharmony_env_init】...

itopen组织&#xff1a;1、提供OpenHarmony优雅实用的小工具2、手把手适配riscv qemu linux的三方库移植3、未来计划riscv qemu ohos的三方库移植 小程序开发4、一切拥抱开源&#xff0c;拥抱国产化 一、概述 本工程的作用主要是基于Ubuntu20.04版本一键自动初始化Ubunt…