kafka-消费者组(SpringBoot整合Kafka)

文章目录

  • 1、消费者组
    • 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
    • 1.2、创建生产者发送消息
    • 1.3、application.yml配置
    • 1.4、创建消费者监听器
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、消费者组

1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

1.2、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);}}
}

在这里插入图片描述
在这里插入图片描述

1.3、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage2(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")public void onMessage3(ConsumerRecord<String, String> record) {System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/23910.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML5拖放(Drag and Drop)全面指南:让网页互动起来

HTML5 引入了许多令人兴奋的新特性&#xff0c;其中之一便是原生支持的拖放&#xff08;Drag and Drop&#xff09;功能&#xff0c;它极大地丰富了网页的交互体验&#xff0c;允许用户直接在页面上通过鼠标操作来移动元素。本文将深入浅出地介绍如何在你的网页中实现拖放功能&…

IT闲谈-WEB前端主流三大框架

目录 一、Angular二、React三、Vue.js小结 前言 这里给大家简单介绍一下web前端框架&#xff1b;随着互联网技术的飞速发展&#xff0c;Web前端技术也在不断地演进和更新。目前&#xff0c;前端比较多的三大主流前端框架Angular、React和Vue.js&#xff0c;成为前端开发者的得…

问题:棕色试剂瓶用于盛装见光易分解的试剂或溶剂。 #其他#学习方法#微信

问题&#xff1a;棕色试剂瓶用于盛装见光易分解的试剂或溶剂。 A、正确 B、错误 参考答案如图所示

响应式流规范解析

在互联网应用构建过程中&#xff0c;我们知道可以采用异步非阻塞的编程模型来提高服务的响应能力。而为了实现异步非阻塞&#xff0c;我们可以引入数据流&#xff0c;并对数据的流量进行控制。我们来考虑一个场景&#xff0c;如果数据消费的速度跟不上数据发出的速度&#xff0…

基于spring boot的超市管理系统【附:资料➕文档】

前言&#xff1a;我是源码分享交流Coding&#xff0c;专注JavaVue领域&#xff0c;专业提供程序设计开发、源码分享、 技术指导讲解、各类项目免费分享&#xff0c;定制和毕业设计服务&#xff01; 免费获取方式--->>文章末尾处&#xff01; 项目介绍&#xff1a; 网址 …

JWT及单点登录实现

JWT发展简史 JWT Token JSON Web Token (JWT&#xff0c;RFC 7519 (opens new window))&#xff0c;是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准&#xff08;(RFC 7519)。 ID Token OIDC (OpenID Connect) 协议 (opens new window)对 OAuth 2.0 协议 …

DEA统计代码行数插件Statistic

1.安装Statistic插件 直接在idea里面搜索Statistic即可 2.重启idea 3.查看代码行数 它可以统计各类文件的行数总和

Mysql基础进阶速成版

一&#xff1a;sql语句&#xff1a; 1.创建一张表&#xff1a;写成公式&#xff1a;创建函数(create table)表名(配置字段)。配置字段公式:字段名称字段类型&#xff0c;常用的类型有&#xff1a;整数类型int(8),int(16),int(32).....&#xff0c;小数类型float(8),float(16).…

【Linux】进程(7):地址空间

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;7&#xff09;&#xff1a;地址空间&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 &#xff08;A&#xff09; 直接看代码&…

go语言接口之sort.Interface接口

排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定&#xff0c;但是一个健壮的实现需要更多的代码&#xff0c;并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是&#xff0c;sort包内置的提供了根据一些排序…

PostgreSQL和MySQL架构模型的区别

PostgreSQL - 多进程架构 PostgreSQL采用了一种多进程架构。在这种模型下&#xff0c;每个数据库连接被分配给一个新的服务器进程&#xff08;或者说是数据库进程&#xff09;。这种方式为每个连接提供了独立的内存空间和执行环境。这样做的优点是提高了系统的稳定性和隔离性&…

法国人工智能初创公司 Mistral 正在推出新的人工智能模型定制选项服务和 SDK

Mistral AI是一家成立于2023年的法国人工智能初创公司&#xff0c;由Artur Mensch、Timothe Lacroix和Guillaume Lample三位前Meta和Google DeepMind的研究人员创立。该公司专注于生成式AI技术&#xff0c;特别是用于构建在线聊天机器人、搜索引擎等应用。 Mistral AI在成立之…

python之List记录

1. 列表&#xff08;List&#xff09;基础 Python中的列表是一种可变、有序的元素集合&#xff0c;元素之间通过逗号分隔并包含在一对方括号内。列表的元素可以是不同类型的数据。 创建列表 # 创建一个包含不同类型元素的列表 my_list [1, 2.5, hello, True, [1, 2, 3]]访问…

[数据集][图像分类]城市异常情况路边倒树火灾水灾交通事故分类数据集15223张8类别

数据集类型&#xff1a;图像分类用&#xff0c;不可用于目标检测无标注文件 数据集格式&#xff1a;仅仅包含jpg图片&#xff0c;每个类别文件夹下面存放着对应图片 图片数量(jpg文件个数)&#xff1a;15223 分类类别数&#xff1a;8 类别名称:[“badroad”,“fallentree”,“f…

Android-Q升级-Camera记录

目录 代码环境 建立Android Q使用的camera仓 Camera底层适配 camx 原生接口变化 其他编译问题 chi-cdk 数据类型不匹配 case未加break的报错 libalRnBRT_GL_GBWRAPPER链接问题 vidhance编译错误 libarcsat链接问题 vendor/qcom/proprietary prebuilt_HY11 调试cam…

floor函数

添加链接描述\ #include<bits/stdc.h>using namespace std;const int N 10;int main() {int n;cin>>n;for(int i1;i<50000;i){if(floor(i*1.08)n){cout<<i;return 0;}}cout<<":(";return 0; }floor函数是向下取整 ceil是向上取整 round…

CarSim车辆运动轨迹绘制

CarSim车辆运动轨迹绘制 CarSim中与车辆位置有关的信息分别为Xo和Yo 输出到Simulink中 导入到工作空间中保存&#xff0c;low_carsim_path.mat &#xff0c;绘制结果曲线&#xff0c;low_carsim_path_comp.m data csvread(low_two_path.csv,1,0); low_two_path_x data(:,1)…

【CentOS】手动编译安装make、cmake、gcc、git

摘要 Centos7升级make和gcc版本到最新——CSDN make make 各个版本下载地址 http://ftp.gnu.org/pub/gnu/make 以4.4为例安装&#xff1a; # 下载 wget https://ftp.gnu.org/pub/gnu/make/make-4.4.tar.gz # 解压配置 tar zxf make-4.4.tar.gz cd make-4.4 ./configure --p…

分享我的新版FMEA培训心得

近日&#xff0c;我有幸参加了深圳天行健企业管理咨询公司举办的新版FMEA培训&#xff0c;这次学习不仅让我对FMEA有了更深入的理解&#xff0c;更使我在实际工作中找到了提升产品质量的新路径。 新版FMEA相较于传统版本&#xff0c;更加注重风险识别与预防&#xff0c;强调在…

Java 开发面试题精选:分布式锁相关一篇全搞定

面试路上&#xff0c;分布式锁始终是绕不开的坎&#xff1f;别怕&#xff0c;这篇精心准备的文章正是您的通关秘籍&#xff01;这篇文章聚焦面试官最青睐的提问点&#xff1a;从分布式锁基础概念到其实现机理&#xff0c;再到它在多场景下的应用智慧&#xff1b;深入剖析性能优…