springboot引入kafka

一. Kafka 简介

什么是 Kafka?
Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源。它用于构建实时数据管道和流应用,能够处理和分析流数据。Kafka 的核心概念
Producer(生产者):发送消息到 Kafka 主题。
Consumer(消费者):从 Kafka 主题中读取消息。
Broker(代理):Kafka 服务器,负责接收和存储消息。
Topic(主题):消息分类的逻辑单元。
Partition(分区):主题的物理分区,便于并行处理。
Offset(偏移量):每条消息在分区中的唯一标识符。

二、Spring Boot 2.7.0 集成 Kafka

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>
spring:kafka:bootstrap-servers: localhost:9092consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: default-group  # 默认的消费者组consumer-group-1:group-id: group1topic: topic1consumer-group-2:group-id: group2topic: topic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Kafka 配置类

 import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Bean(name = "kafkaListenerContainerFactoryGroup1")public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup1(@Value("${spring.kafka.consumer-group-1.group-id}") String groupId) {return createFactory(groupId);}@Bean(name = "kafkaListenerContainerFactoryGroup2")public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup2(@Value("${spring.kafka.consumer-group-2.group-id}") String groupId) {return createFactory(groupId);}private ConcurrentKafkaListenerContainerFactory<String, String> createFactory(String groupId) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();Map<String, Object> props = new HashMap<>(consumerConfigs());props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));return factory;}
}
auto.offset.reset 配置选项
earliest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最早的可用数据开始读取。这通常用于新消费者加入时,从头开始读取所有历史数据。
latest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最新的数据开始读取。这通常用于新消费者加入时,只读取从现在开始的数据。
none:如果消费者没有找到当前偏移量或偏移量超出范围,则会抛出异常。这要求消费者必须有有效的偏移量。
anything else:其他值将导致消费者抛出异常。

Kafka 消费者服务

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "${spring.kafka.consumer-group-1.topic}", containerFactory = "kafkaListenerContainerFactoryGroup1")public void listenGroup1(String message) {System.out.println("Received message in group1: " + message);}@KafkaListener(topics = "${spring.kafka.consumer-group-2.topic}", containerFactory = "kafkaListenerContainerFactoryGroup2")public void listenGroup2(String message) {System.out.println("Received message in group2: " + message);}
}

创建 Kafka 生产者

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {kafkaTemplate.send("my-topic", message);return "Message sent to Kafka topic";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/47803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通俗地理解主动元数据管理

元数据管理&#xff0c;是企业开展数据管理的核心基础&#xff0c;内容涉及元数据的创建&#xff0c;确定需要捕获哪些元数据&#xff0c;通过哪些工具和流程进行创建&#xff0c;继而将元数据妥善存储&#xff0c;保障安全性和可访问性&#xff0c;并不断更新维护&#xff0c;…

git 总结2

记录今日学习内容&#xff1a; 项目组成&#xff1a; 一个master分支QA&#xff1a;开发分支bug修复分支个人开发功能需求分支 常用 从仓库拉取代码&#xff1a; git clone xxx创建并且切换到自己的分支&#xff1a;git checkout -b xxx切换到某分支&#xff1a;git checko…

开发面试算法题求教

在《无尽的拉格朗日》中&#xff0c;有许多不同的星系建筑物。每个星系建筑物的等级不同&#xff0c;带来的影响力也不同。 已知宇宙可以抽象为一个无穷大的平面直角坐标系&#xff0c;现在给定了每个星系建筑物的所在坐标(xi,yi)和它的影响力ri&#xff0c;距离其切比雪夫距离…

C++入门语法总结和STL回顾

目录 iostream和命名空间输入输出流循环输入输出 vector容器string字符串cpp链表操作hashTable哈希表set集合map映射范围for循环 stack栈queue队列list列表类和面向对象 iostream和命名空间 输入输出流 内置库iostream提供了输入和输出功能&#xff0c;允许开发者从键盘读取输…

[渗透测试] 反序列化漏洞

反序列化漏洞 ​ 序列化&#xff1a;将对象的状态信息转换为可以传输或存储的形式的过程。简单的来说&#xff0c;就是将一个抽象的对象转换成可以传输的字符串 &#xff0c;以特定的形式在进行之间实现跨平台的传输。 序列化大多以字节流、字符串、json串的形式来传输。将对…

linux/windows wps node.js插件对PPT状态监听并且通知其他应用

需求背景 公司要求对Window系统&#xff0c;和国产操作系统&#xff08;UOS&#xff09;的wps 软件在 PPT开始播放 结束播放&#xff0c;和播放中翻页 上一页 下一页 等状态进行监听&#xff0c;并通知到我们桌面应用。 技术方案 开发WPS插件&#xff0c;使用node.JS 插件开…

系统架构设计师①:计算机组成与体系结构

系统架构设计师①&#xff1a;计算机组成与体系结构 计算机结构 计算机的组成结构可以概括为以下几个主要部分&#xff1a;中央处理器&#xff08;CPU&#xff09;、存储器&#xff08;包括主存和外存&#xff09;、输入设备、输出设备&#xff0c;以及控制器、运算器、总线和…

如何查看jvm资源占用情况

如何设置jar的内存 java -XX:MetaspaceSize256M -XX:MaxMetaspaceSize256M -XX:AlwaysPreTouch -XX:ReservedCodeCacheSize128m -XX:InitialCodeCacheSize128m -Xss512k -Xmx2g -Xms2g -XX:UseG1GC -XX:G1HeapRegionSize4M -jar your-application.jar以上配置为堆内存4G jar项…

使用puma部署ruby on rails的记录

之前写过一篇《记录一下我的Ruby On Rails的systemd服务脚本》的记录&#xff0c;现在补上一个比较政治正确的Ruby On Rails的生产环境部署记录。使用Puma部署项目。 创建文件 /usr/lib/systemd/system/puma.service [Unit] DescriptionPuma HTTP Server DocumentationRuby O…

C语言中的文件创建、写入与读取

在C语言编程中&#xff0c;文件操作是一项基本且重要的技能。它允许程序与外部数据进行交互&#xff0c;如保存用户输入、读取配置文件或处理日志文件等。下面&#xff0c;我们将详细介绍如何在C语言中创建文件、向文件写入数据以及从文件中读取数据。 点击这里➡️➡️➡️ 锁…

[AWS]MSK调用,报错Access denied

背景&#xff1a;首先MSK就是配置一个AWS的托管 kafka&#xff0c;创建完成之后就交给开发进行使用&#xff0c;开发通常是从代码中&#xff0c;编写AWS的access_key 和secret_key进行调用。 但是开发在进行调用的时候&#xff0c;一直报错连接失败&#xff0c;其实问题很简单&…

Electron 企业级开发通信与本地存储实用解决方案

背景 之前写了一篇Electron通信的方式&#xff0c;讲述了一下三者之间的通信机制&#xff0c;比较恶心&#xff0c;后来发现有个electron/remote&#xff0c; Electron 渲染进程直接调用主进程的API库electron/remote引用讲解-CSDN博客文章浏览阅读58次。remote是个老库&…

蓝队黑名单IP解封提取脚本

应用场景&#xff1a;公司给蓝队人员一个解封IP列表&#xff0c;假如某个IP满足属于某某C段&#xff0c;则对该IP进行解封。该脚本则是进行批量筛选出符合条件的白名单IP 实操如下&#xff1a;公司给了一个已经封禁了的黑名单IP列表如下&#xff08;black&#xff09; 公司要求…

高清视频,无损音频,LDR6023——打造极致视听与高效充电的双重享受!

Type-C PD&#xff08;Power Delivery&#xff09;芯片是一种支持USB Type-C接口规范的电源管理单元&#xff0c;其主要功能包括&#xff1a; 快速充电&#xff1a;Type-C PD芯片支持高功率传输&#xff0c;能够提供更快的充电速度&#xff0c;使电子设备在短时间内充满电&…

微信小程序:多图片显示及图片点击放大,多视频显示

微信小程序&#xff1a;多图片显示及图片点击放大&#xff0c;多视频显示 01 多图片显示及图片点击放大02 多视频03 全部代码 01 多图片显示及图片点击放大 <view><view class"title">图片&#xff1a;</view><block wx:if"{{photoUrlList…

源码搭建国内微短剧系统(APP+小程序)云存储配置流程

国内微短剧系统很多人不知道云存储和配置的操作流程&#xff0c;我整理了一份非常详细的操作文档流程&#xff0c;给大家介绍短剧系统云存储配置的详细操作流程。顺便推荐一下国内微短剧系统。 推荐下他们的开源地址&#xff1a;https://gitee.com/nymaite_com_2878868888/tjg…

CrowdStrike更新致850万Windows设备宕机,微软紧急救火!

7月18日&#xff0c;网络安全公司CrowdStrike发布了一次软件更新&#xff0c;导致全球大范围Windows系统宕机。 预估CrowdStrike的更新影响了将近850万台Windows设备&#xff0c;多行业服务因此停滞&#xff0c;全球打工人原地放假&#xff0c;坐等吃瓜&#xff0c;网络上爆梗…

适用于 Mac 或 MacBook 的最佳数据恢复软件

Apple 设计的电脑可靠且用户友好&#xff0c;但即使是最好的最新款 MacBook硬件也会出现故障。当您的存储出现问题时&#xff0c;数据恢复软件可以帮助您恢复丢失和损坏的文件。 数据丢失的另一个原因是有时会发生令人尴尬的错误。如果您不小心丢弃了所需的文件&#xff0c;然…

【RaspberryPi】树莓派Matlab/Simulink支持包安装与使用

官网支持与兼容性 Raspberry Pi Support from MATLAB - Hardware Support - MATLAB & Simulink Raspberry Pi Support from Simulink - Hardware Support - MATLAB & Simulink Matlab与树莓派兼容性 Simulink与树莓派兼容性 树莓派Matlab&Simulink RaspberryPi支…

本地部署 mistralai/Mistral-Nemo-Instruct-2407

本地部署 mistralai/Mistral-Nemo-Instruct-2407 1. 创建虚拟环境2. 安装 fschat3. 安装 transformers4. 安装 flash-attn5. 安装 pytorch6. 启动 controller7. 启动 mistralai/Mistral-Nemo-Instruct-24078. 启动 api9. 访问 mistralai/Mistral-Nemo-Instruct-2407 1. 创建虚拟…