kafka(五)spring-kafka(2)详解与demo

一、简单的收发消息demo

父工程pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>pom</packaging><modules><module>producer</module><module>consumer-1</module><module>consumer-2</module></modules><!-- springBoot --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
<!--            <version>3.0.0</version>--></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies></project>
1、生产者

1.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializeruser.topic = userTest
school.topic = schoolTest
1.2、dto
package com.example.dto;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class SchoolDTO {private String schoolId;private String schoolName;
}
package com.example.dto;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class UserDTO {private String userId;private String userName;private Integer age;
}
 1.3、service
package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("schoolService")
@Slf4j
public class SchoolServiceImpl implements SchoolService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${school.topic}")private String schoolTopic;@Overridepublic void sendSchoolMsg(SchoolDTO schoolDTO) {String msg = JSON.toJSONString(schoolDTO);ProducerRecord producerRecord = new ProducerRecord(schoolTopic,msg);kafkaTemplate.send(producerRecord);log.info("school消息发送成功");}
}
package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.UserDTO;
import com.example.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("userService")
@Slf4j
public class UserServiceImpl implements UserService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${user.topic}")private String userTopic;@Overridepublic void sendUserMsg(UserDTO userDTO) {String msg = JSON.toJSONString(userDTO);ProducerRecord producerRecord = new ProducerRecord(userTopic,msg);kafkaTemplate.send(producerRecord);log.info("user消息发送成功");}
}
 1.4、启动类
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}
2、消费者

2.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true user.topic = userTest
user.group.id = user-group-1school.topic = schoolTest
school.group.id = school-group-1server.port = 2222
2.2、监听
package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;@Component
@Slf4j
public class SchoolConsumer {@KafkaListener(topics = "${school.topic}", groupId = "${school.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}
package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class UserConsumer {@KafkaListener(topics = "${user.topic}", groupId = "${user.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}

不指定group.id会报错,这也验证了kafka consumer必须要有group id。如写:

@KafkaListener(topics = "${user.topic}")
public void consumer(ConsumerRecord<?, ?> record) 启动报错:

Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
 

2.3、启动类 
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
//@EnableKafka
public class Consumer1Application {public static void main(String[] args) {SpringApplication.run(Consumer1Application.class, args);}
}
3、测试

启动消费者:

生产者这里通过单元测试来发送消息:

package com.demo.kafka;import com.example.ProducerApplication;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import com.example.service.UserService;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = {ProducerApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class Test {@Autowiredprivate SchoolService schoolService;@Autowiredprivate UserService userService;@org.junit.Testpublic void sendUserMsg(){UserDTO userDTO = UserDTO.builder().userId("id-1").age(18).userName("zs").build();userService.sendUserMsg(userDTO);}@org.junit.Testpublic void sendSchoolMsg(){SchoolDTO schoolDTO = SchoolDTO.builder().schoolId("schoolId-1").schoolName("mid school").build();schoolService.sendSchoolMsg(schoolDTO);}}

运行单测,观察消费者输出:

修改参数再次运行,观察到消费者都可以正常监听: 

2024-06-22 17:09:06.383  INFO 76104 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 2222 (http) with context path ''
2024-06-22 17:09:06.390  INFO 76104 --- [ntainer#1-0-C-1] org.apache.kafka.clients.Metadata        : Cluster ID: ZdpIAHTjS9GhJlvPP8n0Rw
2024-06-22 17:09:06.392  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-22 17:09:06.393  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Revoking previously assigned partitions []
2024-06-22 17:09:06.394  INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2024-06-22 17:09:06.394  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] (Re-)joining group
2024-06-22 17:09:06.401  INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 11
2024-06-22 17:09:06.403  INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions [schoolTest-0]
2024-06-22 17:09:06.403  INFO 76104 --- [           main] com.example.Consumer1Application         : Started Consumer1Application in 8.606 seconds (JVM running for 9.621)
2024-06-22 17:09:06.413  INFO 76104 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [schoolTest-0]
2024-06-22 17:09:06.491  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 3
2024-06-22 17:09:06.493  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions [userTest-0]
2024-06-22 17:09:06.611  INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [userTest-0]
2024-06-22 17:16:29.775  INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer          : 接收到:msg={"age":18,"userId":"id-1","userName":"zs"},topic:userTest,partition=0,offset=4
2024-06-22 17:16:48.157  INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer        : 接收到:msg={"schoolId":"schoolId-1","schoolName":"mid school"},topic:schoolTest,partition=0,offset=1
2024-06-22 17:17:39.458  INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer          : 接收到:msg={"age":20,"userId":"id-2","userName":"ls"},topic:userTest,partition=0,offset=5
2024-06-22 17:17:59.474  INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer        : 接收到:msg={"schoolId":"schoolId-2","schoolName":"primary school"},topic:schoolTest,partition=0,offset=2
4、多个消费者
4.1、同一个groupId

将consumer-1的代码copy到consumer-2,注意端口号修改成不一样的3333,并启动,

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true user.topic = userTest
user.group.id = user-group-1school.topic = schoolTest
school.group.id = school-group-1server.port = 3333
2024-06-22 17:23:59.524  INFO 78096 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 3333 (http) with context path ''
2024-06-22 17:23:59.531  INFO 78096 --- [           main] example.Consumer2Application             : Started Consumer2Application in 7.534 seconds (JVM running for 8.506)
2024-06-22 17:24:00.021  INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 12
2024-06-22 17:24:00.024  INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.025  INFO 78096 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: []
2024-06-22 17:24:00.028  INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 4
2024-06-22 17:24:00.028  INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.029  INFO 78096 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: []

再执行次生产者的Test,观察两个消费者:

 可以看到consumer-1接收到了,而consumer-2没有接收到。

再次执行,结果相同。school也是同样的结果。

验证了:同一个topic下的某个分区只能被消费者组中的一个消费者消费。

4.2、不同group

现修改cosumer-2中groupId并重启

user.group.id = user-group-2
school.group.id = school-group-2

启动后自动接收了之前发送的所有消息(因为这是一个新的消费者组):

再次发送新的消息:

可以看到consumer-1和2同时都接收到了:

 

验证了:同一个topic可以被不同的消费者组消费。

二、生产者分区partition

先观察上面步骤产生的数据文件:

上面只有一个patition, 所有两个topic各自只有一个数据目录。现将userTest这个topic分成多个partition,结合四种分区策略看下:

1、指定分区
2、轮询
3、key哈希分区策略
4、自定义分区策略(即自定义Partitioner)

三、消费者分配策略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌手机刷机教学

注意&#xff1a;手机已经解开了oem锁和bl 1、adb基础命令 连接设备adb devices&#xff1a;列出当前连接的所有设备。 adb connect <设备IP>&#xff1a;通过IP地址连接设备&#xff08;用于无线连接&#xff09;。 设备信息adb shell getprop&#xff1a;获取设备的所…

Docker部署MySQL8.3.0(保姆级图文教程)

系列文章目录 Docker部署Nginx1.21.5&#xff08;保姆级图文教程&#xff09; Docker部署MySQL8.3.0&#xff08;保姆级图文教程&#xff09; 文章目录 一、环境二、拉取镜像2.1 查找 Docker Hub 上的 MySQL 镜像2.2 拉取MySQL镜像2.3 查看MySQL镜像 三、在宿主机创建目录3.1 创…

无痛接入图像生成风格迁移能力:GAN生成对抗网络

AI应用开发相关目录 本专栏包括AI应用开发相关内容分享&#xff0c;包括不限于AI算法部署实施细节、AI应用后端分析服务相关概念及开发技巧、AI应用后端应用服务相关概念及开发技巧、AI应用前端实现路径及开发技巧 适用于具备一定算法及Python使用基础的人群 AI应用开发流程概…

【一】【算法】经典树状数组和并查集,详细解析,初步认识,【模板】树状数组 1,树状数组并查集

【模板】树状数组 1 题目描述 如题&#xff0c;已知一个数列&#xff0c;你需要进行下面两种操作&#xff1a; 将某一个数加上 x x x 求出某区间每一个数的和 输入格式 第一行包含两个正整数 n , m n,m n,m&#xff0c;分别表示该数列数字的个数和操作的总个数。 第二…

java: Annotation processing is not supported for module cycles.

java: Annotation processing is not supported for module cycles. 查了半天是造成了循环依赖 解决步骤 1.打开project structure 2.找到循环依赖的两个或多个模块&#xff0c;在dependencies中找到对应的模块并删除 仅记录我遇到这个问题的解决方法&#xff0c;并不适合所…

七层和四层的区别

OSI七层模型的结构如下&#xff1a; 物理层&#xff08;Physical Layer&#xff09;&#xff1a;负责传输原始比特流&#xff0c;实现数据在物理媒介上的传输&#xff1b; 数据链路层&#xff08;Data Link Layer&#xff09;&#xff1a;负责在相邻节点之间传输数据帧&#…

Git简单使用和理解

workspace: 本地的工作目录。 index/stage&#xff1a;暂存区域&#xff0c;临时保存本地改动。 local repository: 本地仓库&#xff0c;只想最后一次提交HEAD。 remote repository&#xff1a;远程仓库。 对于Git,首先应该明白第一git是一种分布式版本控制系统&#xff0c;最…

后仿真中 module path polarity 问题

目录 一 未知极性 二 正极性 三 负极性 不知道大家有没有遇到这个问题:什么?我们知道的module path delay 指的是定义在specify...endspecify block 中的语句,指示输入-输出的延迟信息。 这里的module path 竟然还有极性问题,今天,来学习一下。 模块路径的极性是一…

用RNN构建人名分类器

目录 项目综述1.导入必备的工具包2.处理数据&#xff0c;满足训练要求2.1 统计常用的字符2.2 进行规范化处理,去除重音符号2.3 将文件读取到内存中2.4 构建人名国家和具体人名的对应关系2.5 one-hot编码 3.构建RNN模型3.1 构建传统RNN模型3.2 构建传统LSTM模型3.3 构建传统GRU模…

永久免费设备日志采集工具

免费试用下载: Gitee下载 最新版本 优势: A. 开箱即用. 解压直接运行.不需额外安装. B. 批管理设备. 设备配置均在后台管理. C. 无人值守 客户端自启动,自更新. D. 稳定安全. 架构简单,内存占用小,通过授权访问.

openeuler一个服务异常占用cpu的排查过程

1 环境 硬件环境&#xff1a;LS1046A arm64 系统环境&#xff1a;openEuler release 22.03 (LTS-SP1) Linux kernel 4.19.26 2 问题说明 我的硬件平台需要适配一下 openEuler release 22.03 (LTS-SP1) 但是目前只能使用原来硬件平台的内核&#xff0c;在适配的过程中…

LLM大语言模型应用方案之RAG检索增强生成的实现步骤。

0.我理解的RAG 什么是RAG&#xff1f; RAG的全称是“检索增强生成模型”&#xff08;Retrieval-Augmented Generation&#xff09;。这是一种特别聪明的大语言模型。 RAG是怎么工作的呢&#xff1f; 1.检索&#xff1a;当你问RAG一个问题时&#xff0c;它会先去“图书…

aardio - 【库】lock 跨进程读写锁

import win.ui; /*DSG{{*/ var winform win.form(text"aardio form";right272;bottom203;topmost1) winform.add( button{cls"button";text"无锁演示";left27;top132;right120;bottom184;z2}; button2{cls"button";text"有锁演示…

Redis的实战常用一、验证码登录(解决session共享问题)(思路、意识)

一、基于session实现登录功能 第一步&#xff1a;发送验证码&#xff1a; 用户在提交手机号后&#xff0c;会校验手机号是否合法&#xff1a; 如果不合法&#xff0c;则要求用户重新输入手机号如果手机号合法&#xff0c;后台此时生成对应的验证码&#xff0c;同时将验证码进行…

前端路线指导(4):前端春招秋招经验分享

春招/秋招经验分享(前端) 哈喽大家好&#xff0c;我是小粉&#xff0c;双一流本科&#xff0c;自学前端一年&#xff0c;收获腾讯&#xff0c;字节等多家大厂offer&#xff0c;一半以上ssp~ 今天给大家分享一下我的春招&#xff08;暑期实习&#xff09;、秋招经历&#xff0c;…

【Gradio】如何设置 Gradio 数据框的样式

简介 数据可视化是数据分析和机器学习的关键方面。Gradio DataFrame 组件是一种流行的方式&#xff0c;在网络应用程序中显示表格数据&#xff08;特别是以 pandas DataFrame 对象的形式&#xff09;。 本文将探讨 Gradio 的最新增强功能&#xff0c;这些功能允许用户整合 pand…

Spring的启动扩展点机制详解

在Java的世界中&#xff0c;我们知道Spring是当下最主流的开发框架&#xff0c;没有之一。而在使用Dubbo、Mybatis等开源框架时&#xff0c;我们发现可以采用和Spring完全一样的使用方式来使用它们。 可能你在平时的使用过程中并没有意识到这一点&#xff0c;但仔细想一想&…

解决js打开新页面百度网盘显示不存在方法:啊哦,你所访问的页面不存在了。

用js打开新页面open或window.location.href打开百度网盘后都显示&#xff1a;啊哦&#xff0c;你所访问的页面不存在了。 window.open(baidu_url); window.location.href baidu_url;在浏览器上&#xff0c;回车后网盘资源是可以打开的&#xff0c;刷新也是打开的。这是很奇怪…

深入分析并可视化城市轨道数据

介绍 中国城市化进程加速中&#xff0c;城市轨道交通的迅速扩张成为提升城市运行效率和居民生活品质的关键。这一网络从少数大城市延伸至众多大中型城市&#xff0c;映射了经济飞跃和城市管理现代化。深入分析并可视化城市轨道数据&#xff0c;对于揭示网络特性、评估效率、理…

进程、线程的区别

进程、线程的关系 开工厂生产手机&#xff0c;制作一条生产线&#xff0c;这个生产线上有很多的器件以及材料。一条生产线就是一个进程。 只有生产线是不够的&#xff0c;使用找五个工人来进行生产&#xff0c;这个工人能够利用这些材料最终一步步的将手机做出来&#xff0c;这…