kafka-消费者组(SpringBoot整合Kafka)

文章目录

  • 1、消费者组
    • 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
    • 1.2、创建生产者发送消息
    • 1.3、application.yml配置
    • 1.4、创建消费者监听器
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、消费者组

1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

1.2、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);}}
}

在这里插入图片描述
在这里插入图片描述

1.3、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage2(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")public void onMessage3(ConsumerRecord<String, String> record) {System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/23910.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IT闲谈-WEB前端主流三大框架

目录 一、Angular二、React三、Vue.js小结 前言 这里给大家简单介绍一下web前端框架&#xff1b;随着互联网技术的飞速发展&#xff0c;Web前端技术也在不断地演进和更新。目前&#xff0c;前端比较多的三大主流前端框架Angular、React和Vue.js&#xff0c;成为前端开发者的得…

问题:棕色试剂瓶用于盛装见光易分解的试剂或溶剂。 #其他#学习方法#微信

问题&#xff1a;棕色试剂瓶用于盛装见光易分解的试剂或溶剂。 A、正确 B、错误 参考答案如图所示

响应式流规范解析

在互联网应用构建过程中&#xff0c;我们知道可以采用异步非阻塞的编程模型来提高服务的响应能力。而为了实现异步非阻塞&#xff0c;我们可以引入数据流&#xff0c;并对数据的流量进行控制。我们来考虑一个场景&#xff0c;如果数据消费的速度跟不上数据发出的速度&#xff0…

基于spring boot的超市管理系统【附:资料➕文档】

前言&#xff1a;我是源码分享交流Coding&#xff0c;专注JavaVue领域&#xff0c;专业提供程序设计开发、源码分享、 技术指导讲解、各类项目免费分享&#xff0c;定制和毕业设计服务&#xff01; 免费获取方式--->>文章末尾处&#xff01; 项目介绍&#xff1a; 网址 …

JWT及单点登录实现

JWT发展简史 JWT Token JSON Web Token (JWT&#xff0c;RFC 7519 (opens new window))&#xff0c;是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准&#xff08;(RFC 7519)。 ID Token OIDC (OpenID Connect) 协议 (opens new window)对 OAuth 2.0 协议 …

DEA统计代码行数插件Statistic

1.安装Statistic插件 直接在idea里面搜索Statistic即可 2.重启idea 3.查看代码行数 它可以统计各类文件的行数总和

【Linux】进程(7):地址空间

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;7&#xff09;&#xff1a;地址空间&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 &#xff08;A&#xff09; 直接看代码&…

法国人工智能初创公司 Mistral 正在推出新的人工智能模型定制选项服务和 SDK

Mistral AI是一家成立于2023年的法国人工智能初创公司&#xff0c;由Artur Mensch、Timothe Lacroix和Guillaume Lample三位前Meta和Google DeepMind的研究人员创立。该公司专注于生成式AI技术&#xff0c;特别是用于构建在线聊天机器人、搜索引擎等应用。 Mistral AI在成立之…

[数据集][图像分类]城市异常情况路边倒树火灾水灾交通事故分类数据集15223张8类别

数据集类型&#xff1a;图像分类用&#xff0c;不可用于目标检测无标注文件 数据集格式&#xff1a;仅仅包含jpg图片&#xff0c;每个类别文件夹下面存放着对应图片 图片数量(jpg文件个数)&#xff1a;15223 分类类别数&#xff1a;8 类别名称:[“badroad”,“fallentree”,“f…

CarSim车辆运动轨迹绘制

CarSim车辆运动轨迹绘制 CarSim中与车辆位置有关的信息分别为Xo和Yo 输出到Simulink中 导入到工作空间中保存&#xff0c;low_carsim_path.mat &#xff0c;绘制结果曲线&#xff0c;low_carsim_path_comp.m data csvread(low_two_path.csv,1,0); low_two_path_x data(:,1)…

分享我的新版FMEA培训心得

近日&#xff0c;我有幸参加了深圳天行健企业管理咨询公司举办的新版FMEA培训&#xff0c;这次学习不仅让我对FMEA有了更深入的理解&#xff0c;更使我在实际工作中找到了提升产品质量的新路径。 新版FMEA相较于传统版本&#xff0c;更加注重风险识别与预防&#xff0c;强调在…

【递归、搜索与回溯】递归算法

一、经验总结 递归 VS 迭代&#xff08;循环&#xff09; 递归和迭代都解决的是重复的子问题&#xff0c;因此两者是可以相互转化的。利用栈结构可以将递归算法转化为迭代算法。 递归和迭代各有其优缺点&#xff0c;选择时需根据具体场景和需求来决定。 递归的优点包括&#…

苹果眼镜(Vision Pro)专业咨询服务模式优化方案

一、精准定位&#xff1a; 专注于为Apple Vision Pro应用开发者提供一站式、全方位的专业咨询服务&#xff0c;致力于成为开发者在空间计算时代中不可或缺的合作伙伴&#xff0c;共同打造“下一个大事件”。 二、核心业务优化&#xff1a; visionOS策略咨询&#xff1a; 深入…

【氵】Archlinux+KDE Plasma+Wayland 安装nvidia驱动 / 开启HDR

参考: NVIDIA - Arch Linux 中文维基 &#xff08;其实就是把 wiki 简化了一下 注&#xff1a;本教程适用 GeForce 930 起、10 系至 20 系、 Quadro / Tesla / Tegra K-系列以及更新的显卡&#xff08;NV110 以及更新的显卡家族&#xff09;&#xff0c;此处以 RTX3060 为例 …

LlamIndex二 RAG应用开发

在AutoGen)系列后&#xff0c;我又开始了LlamIndex 系列。欢迎查询LlamaIndex 一 简单文档查询 - 掘金 (juejin.cn)了解LlamIndex&#xff0c;今天我们来看看LlamIndex的拿手戏&#xff0c;RAG应用开发。 何为RAG&#xff1f; RAG全称"Retrieval-Augmented Generation&q…

vue处理json数据

背景&#xff1a;后端返回的数据不是我想要的&#xff0c;现在需要把 name 替换为title&#xff08;小声蛐蛐&#xff1a;又让我处理数据&#xff09; 后端返回数据格式 修改字段操作&#xff1a;&#xff08;使用递归遍历的方式将title属性赋了name的值&#xff09; renderT…

详细分析Mysql临时变量的基本知识(附Demo)

目录 前言1. 用户变量2. 会话变量 前言 临时变量主要分为用户变量和会话变量 1. 用户变量 用户变量是特定于会话的&#xff0c;在单个会话内可以在多个语句中共享 以 符号开头在 SQL 语句中使用 SET 语句或直接在查询中赋值 声明和赋值 SET var_name value; -- 或者 SE…

构建Vue3项目的几种方式,如何简化setup写法

1、说明 在vue2版本中&#xff0c;我们使用vue-cli脚手架进行构建&#xff0c;而切换到Vue3之后&#xff0c;依然可以使用vue-cli脚手架进行构建&#xff0c;但是官方推荐使用vite工具进行构建&#xff0c;下面将介绍几种方式构建vue3项目。 2、使用vue-cli脚手架构建Vue3项目…

【前端面试高频手写题】

# 面试高频手写题 建议优先掌握&#xff1a; instanceof - 考察对原型链的理解 new - 对创建对象实例过程的理解 call/apply/bind - 对this指向的理解 手写promise - 对异步的理解手写原生ajax - 对ajax原理和http请求方式的理解&#xff0c;重点是get和post请求的实现 # 1…

高考填报志愿,怎么分析自己适合什么专业?

高考结束后&#xff0c;很多考生不知道自己的分数段适合什么学校&#xff0c;缺乏目标感&#xff0c;有些专业名称很大&#xff0c;听起来光鲜亮丽&#xff0c;但是是否适合自己&#xff0c;学什么课程&#xff0c;将来就业去向&#xff0c;这些都是需要细致了解的。 专业选择…