Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63392.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS中的原型链与继承

原型链的类比 JS中原型链&#xff0c;本质上就是对象之间的关系&#xff0c;通过protoype和[[Prototype]]属性建立起来的连接。这种链条是动态的&#xff0c;可以随时变更。 这个就跟C/C中通过指针建立的关系很相似&#xff0c;比如&#xff0c;通过指针建立一个链表&#xf…

hive分区分桶、数据倾斜总结

一、hive的基本概念 hive是一个构建在hadoop上的数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张数据库表并提供数据查询功能 二、hive的特点 &#xff08;1&#xff09;数据是存储在hdfs上 &#xff08;2&#xff09;底层是将sql转换为MapReduce任务进行计算 …

CSS学习记录04

CSS边框 CSS border 属性指定元素边框的样式、宽度和颜色。border-style 属性指定要显示的边框类型。dotted - 定义点线边框dashed - 定义虚线边框solid - 定义实线边框double - 定义双边框groove - 定义3D坡口边框&#xff0c;效果取决于border-color值ridge - 定义3D脊线边框…

一文了解模式识别顶会ICPR 2024的研究热点与最新趋势

简介 对模式识别研究领域前沿方向的跟踪是提高科研能力和制定科研战略的关键。本文通过图文并茂的方式介绍了ICPR 2024的研究热点与最新趋势&#xff0c;帮助读者了解和跟踪模式识别的前沿研究方向。本推文的作者是黄星宇&#xff0c;审校为邱雪和许东舟。 一、会议介绍 ICPR…

服务器挖矿

文章目录 一、确定挖矿进程并停止二、查找并清除挖矿相关文件三、检查并修复系统漏洞四、加强安全防护 一、确定挖矿进程并停止 查找挖矿进程 在Linux系统中&#xff0c;可以使用命令如top或htop来查看系统资源占用情况。挖矿程序通常会占用大量的CPU或GPU资源。例如&#xff…

福昕PDF低代码平台

福昕PDF低代码平台简介 福昕PDF 低代码平台是一款创新的工具&#xff0c;旨在简化PDF处理和管理的流程。通过这个平台&#xff0c;用户可以通过简单的拖拽界面上的按钮&#xff0c;轻松完成对Cloud API的调用工作流&#xff0c;而无需编写复杂的代码。这使得即使没有编程经验的…

oracle 11g中如何快速设置表分区的自动增加

在很多业务系统中&#xff0c;一些大表一般通过分区表的形式来实现数据的分离管理&#xff0c;进而加快数据查询的速度。分区表运维管理的时候&#xff0c;由于人为操作容易忘记添加分区&#xff0c;导致业务数据写入报错。所以我们一般通过配置脚本或者利用oracle内置功能实现…

Antd X : 迅速搭建 AI 页面的解决方案

前言 随着 AI 热度的水涨船高&#xff0c;越来越多的 AI 应用如井喷式爆发&#xff0c;那么如何迅速搭建一个 AI 应用的美观高质量 Web 前端页面呢&#xff0c; Antd 团队给出了一个解决方案。 X Ant DesIgn XAI 体验新秩序Ant Design 团队匠心呈现 RICH 设计范式&#xff0…

SD Express 卡漏洞导致笔记本电脑和游戏机遭受内存攻击

Positive Technologies 最近发布的一份报告揭示了一个名为 DaMAgeCard 的新漏洞&#xff0c;攻击者可以利用该漏洞利用 SD Express 内存卡直接访问系统内存。 该漏洞利用了 SD Express 中引入的直接内存访问 (DMA) 功能来加速数据传输速度&#xff0c;但也为对支持该标准的设备…

Java的Stream流:文件处理、排序与串并行流的全面指南

Java的Stream流&#xff1a;文件处理、排序与串并行流的全面指南 Java 8 引入了 Stream API&#xff0c;这是一个用于处理集合数据的强大工具&#xff0c;它提供了一种声明式的方式来进行聚合操作。Stream 不是一个数据结构&#xff0c;而是一种对数据进行操作的抽象&#xff…

运维工程师.云计算工程师指令集锦

LINUX简介与安装 一、Linux基础认知知识&#xff1a; 多使用者、多任务、多层次 Linux&#xff1a;开源、免费、安全、稳定 Linux中一切皆文件 Linux严格区分大小写 Linux文件命名规则&#xff1a; ①除了/之外&#xff0c;所有字符都合法&#xff1b; ②有些字符最好不用&…

波特图方法

在电路设计中&#xff0c;波特图为最常用的稳定性余量判断方法&#xff0c;波特图的根源是如何来的&#xff0c;却鲜有人知。 本章节串联了奈奎斯特和波特图的渊源&#xff0c;给出了其对应关系和波特图相应的稳定性余量。 理论贯通&#xff0c;不在于精确绘…

React 组件中 State 的定义、使用及正确更新方式

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;React篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来React篇专栏内容React 组件中 State 的定义、使用及正确更新方式 前言 在 React 应用开发中&#xff0c;state …

C++(十二)

前言&#xff1a; 本文将进一步讲解C中&#xff0c;条件判断语句以及它是如何运行的以及内部逻辑。 一&#xff0c;if-else,if-else语句。 在if语句中&#xff0c;只能判断两个条件的变量&#xff0c;若想实现判断两个以上条件的变体&#xff0c;就需要使用if-else,if-else语…

查询产品所涉及的表有(product、product_admin_mapping)

文章目录 1、ProductController2、AdminCommonService3、ProductApiService4、ProductCommonService5、ProductSqlService1. 完整SQL分析可选部分&#xff08;条件筛选&#xff09;&#xff1a; 2. 涉及的表3. 总结4. 功能概述 查询指定管理员下所有产品所涉及的表&#xff1f;…

Java应用服务器JVM配置与优化全面指南

Java应用服务器JVM配置与优化全面指南 1. JVM基础知识 Java虚拟机&#xff08;JVM&#xff09;是Java平台的核心&#xff0c;负责执行Java字节码。理解JVM的基本组成和工作原理是进行有效优化的基础。 1.1 JVM主要组件 类加载器&#xff1a;负责加载、链接和初始化类文件。…

游戏引擎学习第36天

仓库 :https://gitee.com/mrxiao_com/2d_game 回顾之前的内容 在这个程序中&#xff0c;目标是通过手动编写代码来从头开始制作一个完整的游戏。整个过程不使用任何库或现成的游戏引擎&#xff0c;这样做的目的是为了能够全面了解游戏执行的每一个细节。开发过程中&#xff0…

【SpringMVC】用户登录器项目,加法计算器项目的实现

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;用户登录项目实现 1&#xff1a;需求 2&#xff1a;准备工作 &#xff08;1&#xf…

3.5 认识决策树

3.5 认识决策树 3.5.1 认识决策树 如何高效的进行决策&#xff1f; 特征的先后顺序 3.5.2 决策树分类原理详解 已知有四个特征&#xff0c;预测 是否贷款给某个人。 先看房子&#xff0c;再看工作&#xff0c;是否贷款。 年龄&#xff0c;信贷情况&#xff0c;工作&#…

MyBatis-Plus分页查询方式

分页查询基本方式 SpringBootTest(classes LearningApplication.class) public class MPTest {AutowiredILearningLessonService lessonService;Testpublic void test(){/*** Page<LearningLesson>&#xff1a;MyBatisPlus提供的分页对象* 1&#xff1a;当前页数* 2&am…