Spring Boot整合Kafka,实现单条消费和批量消费,示例教程

如何安装Kafka,可以参考docker搭载Kafka集群,一个文件搞定,超简单,亲试可行-CSDN博客

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml文件

在application.yml中加入

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

3、主要示例代码

创建一个目录和四个java文件,可以做测试

3.1、KafkaConfig.java

Kafka监听器配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}// 批量消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

3.2、KafkaProducer.java

生产者

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "test-topic";for (int i = 1; i <= 10; i++) {String message = "Message " + i;kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

3.3、SingleConsumer.java

单条消息消费者

autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {System.out.println("SingleConsumer - Received: " + record.value());// 手动提交offsetacknowledgment.acknowledge();}
}

3.4、BatchConsumer.java

批量消息消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class BatchConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "batchFactory", autoStartup = "false")public void batchListen(List<String> messages, Acknowledgment acknowledgment) {System.out.println("BatchConsumer - Received batch: " + messages);// 手动提交offsetacknowledgment.acknowledge();}
}

4、测试

4.1、单条信息消费模式

在SingleConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

消费成功

4.2、批量信息消费模式

在BatchConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

配置文件中设置了max-poll-records: 2,所有一次只消费两条

消费成功

如果在BatchConsumer.java和SingleConsumer.java中设置autoStartup = "true",Kafka会随机选择消费者组里的一个消费者进行消费,所有可以会导致其中一个消费者没有消费信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

django基于Python的农产品销售系统的设计与实现

摘 要 随着现代人们的快速发展&#xff0c;农产品销售系统已成为农产品的需求。该平台采用Python技术和django搭建系统框架&#xff0c;后台使用MySQL数据库进行信息管理&#xff1b;通过个人中心、用户管理、商家管理、产品类型管理、农产品管理、系统管理、订单管理等功能&a…

项目-摄像

树莓派摄像头使用方法 Camera教程 https://www.raspi.cc/index.php?cread&id53&page1 nanopc-t4 ​https://www.raspi.cc/index.php?cread&id53&page1 摄像头型号 Raspberry Pi Camera Rev 1.3 检测故障 dmesg | grep -i mipi piNanoPC-T4:~$ dmesg | …

Facebook商城号封号的原因是什么?

Facebook商城作为一个重要的销售平台&#xff0c;不仅为商家提供了巨大的市场机会&#xff0c;也带来了一系列需要警惕的风险&#xff0c;其中包括账号被封的风险。本文将从环境异常、频繁操作和违规行为三个主要方面深入探讨&#xff0c;解析导致Facebook商城账号被封禁的具体…

聊一聊Elasticsearch的索引分片的恢复机制

1、什么是索引分片的恢复&#xff1f; 所谓索引分片的恢复指的是在某些条件下&#xff0c;索引分片丢失&#xff0c;ES会把某索引的分片复制一份来得到该分片副本的过程。 2、触发分片恢复的场景有哪些&#xff1f; 分片的分配 当集群中节点的数量发生变化&#xff0c;或者配…

字符串的基本操作(C语言版)

一、实验内容&#xff1a; 采用顺序结构存储串&#xff0c;编写一个函数substring(strl,str2)&#xff0c;用于判定str2是否为strl的子串&#xff1b;编写一个函数&#xff0c;实现在两个已知字符串中找出所有非空最长公共子串的长度和最长公共子串的个数&#xff1b; ①字符…

一些任务调度的概念杂谈

任务调度 1.什么是调度任务 依赖&#xff1a;依赖管理是整个DAG调度的核心。调度依赖包括依赖策略和依赖区间。 依赖分为任务依赖和作业依赖&#xff0c;任务依赖是DAG任务本身的依赖关系&#xff0c;作业依赖是根据任务依赖每天的作业产生的。两者在数据存储模型上有所不同…

解决 npm xxx was blocked, reason: xx bad guy, steal env and delete files

问题复现 今天一位朋友说&#xff0c;vue2的老项目安装不老依赖&#xff0c;报错内容如下&#xff1a; npm install 451 Unavailable For Legal Reasons - GET https://registry.npmmirror.com/vab-count - [UNAVAILABLE_FOR_LEGAL_REASONS] vab-count was blocked, reas…

o1的风又吹到多模态,直接吹翻了GPT-4o-mini

开源LLaVA-o1&#xff1a;一个设计用于进行自主多阶段推理的新型VLM。与思维链提示不同&#xff0c;LLaVA-o1独立地参与到总结、视觉解释、逻辑推理和结论生成的顺序阶段。 LLaVA-o1超过了一些更大甚至是闭源模型的性能&#xff0c;例如Gemini-1.5-pro、GPT-4o-mini和Llama-3.…

共建智能软件开发联合实验室,怿星科技助力东风柳汽加速智能化技术创新

11月14日&#xff0c;以“奋进70载&#xff0c;智创新纪元”为主题的2024东风柳汽第二届科技周在柳州盛大开幕&#xff0c;吸引了来自全国的汽车行业嘉宾、技术专家齐聚一堂&#xff0c;共襄盛举&#xff0c;一同探寻如何凭借 “新技术、新实力” 这一关键契机&#xff0c;为新…

Qt桌面应用开发 第四天(对话框 界面布局)

目录 1.对话框 1.1模拟对话框 1.2非模拟对话框 1.3消息对话框 1.3.1询问对话框 1.3.2严重错误对话框 1.3.3信息提示对话框 1.3.4警告对话框 1.4其他对话框 1.4.1颜色对话框 1.4.2文件对话框 1.4.3字体对话框 1.5界面布局 1.对话框 1.1模拟对话框 会阻塞同一应用…

一文带你快速初步了解云计算与大数据

目录 &#x1f50d;一、云计算基础 1、云计算的概念、特点、关键技术 2、云计算的分类 3、云计算的部署模式 4、云计算的服务模式&#xff1a;IaaS、PaaS、SaaS分别是什么&#xff0c;具体含义要清楚 5、物联网的概念 6、物联网和云计算、大数据的关系 7、了解云计算的…

【新人系列】Python 入门(十一):控制结构

✍ 个人博客&#xff1a;https://blog.csdn.net/Newin2020?typeblog &#x1f4dd; 专栏地址&#xff1a;https://blog.csdn.net/newin2020/category_12801353.html &#x1f4e3; 专栏定位&#xff1a;为 0 基础刚入门 Python 的小伙伴提供详细的讲解&#xff0c;也欢迎大佬们…

VideoCrafter模型部署教程

一、介绍 VideoCrafter是一个功能强大的AI视频编辑和生成工具&#xff0c;它结合了深度学习和机器学习技术&#xff0c;为用户提供了便捷的视频制作和编辑体验。 系统&#xff1a;Ubuntu22.04系统&#xff0c;显卡&#xff1a;4090&#xff0c;显存&#xff1a;24G 二、基础…

机器翻译基础与模型 之二: 基于CNN的模型

一、CNN网络 相比于全连接网络&#xff0c;卷积神经网络最大的特点在于具有局部连接&#xff08;Locally Connected&#xff09;和权值共享&#xff08;Weight Sharing&#xff09;的特性。 1.1 卷积核与卷积操作 1.2 步长与填充 1.3 池化 以上关于CNN的基础概念和技术就不…

Vue 3集成海康Web插件实现视频监控

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;组件封装篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来组件封装篇专栏内容:Vue 3集成海康Web插件实现视频监控 引言 最近在项目中使用了 Vue 3 结合海康Web插件来实…

【Maven】nexus 配置私有仓库配置【转】

介绍&#xff1a;【Maven】Nexus几个仓库的介绍-CSDN博客 一、仓库类型 proxy 远程仓库的代理&#xff0c;比如说nexus配置了一个central repository的proxy,当用户向这个proxy请求一个artifact的时候&#xff0c;会现在本地查找&#xff0c;如果找不到&#xff0c;则会从远程…

Python学习------第十天

数据容器-----元组 定义格式&#xff0c;特点&#xff0c;相关操作 元组一旦定义&#xff0c;就无法修改 元组内只有一个数据&#xff0c;后面必须加逗号 """ #元组 (1,"hello",True) #定义元组 t1 (1,"hello") t2 () t3 tuple() prin…

Spring Web入门练习

加法计算器 约定前后端交互接⼝ 约定 "前后端交互接⼝" 是进⾏ Web 开发中的关键环节. 接⼝⼜叫 API&#xff08;Application Programming Interface), 我们⼀般讲到接⼝或者 API&#xff0c;指的都是同⼀个东西. 是指应⽤程序对外提供的服务的描述, ⽤于交换信息…

uniapp微信小程序接入airkiss插件进行WIFI配网

本文可参考uniapp小程序插件 一.申请插件 微信公众平台设置页链接&#xff1a;微信公众平台 登录您的小程序微信公众平台&#xff0c;进入设置页&#xff0c;在第三方设置->插件管理->添加插件中申请AiThinkerAirkissforWXMini插件&#xff0c;申请的插件appId为【wx6…

蓝队技能-应急响应篇日志自动采集日志自动查看日志自动化分析Web安全内网攻防工具项目

知识点&#xff1a; 1、应急响应-系统日志收集-项目工具 2、应急响应-系统日志查看-项目工具 3、应急响应-日志自动分析-项目工具 演示案例-蓝队技能-工具项目-自动日志采集&自动日志查看&自动日志分析 系统日志自动采集-观星应急工具(Windows系统日志) SglabIr_Co…