SpringBoot集成Kafka和avro和Schema注册表

Schema注册表

为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
在这里插入图片描述
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。

安装confluent

参考:安装手册

# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz

解压以后目录结构如下:

文件夹描述
bin可执行文件
etc配置文件
lib服务
libexec多平台的客户端库
sharejar包和license
src源码
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help

启动confluent服务

启动zookeeper

cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper

启动kafka

cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动./bin/kafka-server-start  -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092

启动confluent

cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081

新建springboot项目

新建avro的schema文件User.avsc

{"namespace": "com.github.xjs.protocol","type": "record","name": "UserRecord","fields": [{"name": "id","type": "int"},{"name": "name","type": "string"}]
}

pom中添加avro-maven-plugin插件

<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!-- 命令行执行:mvn generate-sources 把avsc转化成java文件 
-->
<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions>
</plugin>

添加avro和kafka的依赖

<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.7.1</version>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

添加对应的配置

server:port: 8080
spring:kafka:bootstrap-servers: 192.168.200.128:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer# 重点关注这里的KafkaAvroSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializerconsumer:group-id: testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 重点关注这里的.KafkaAvroDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:# confluent的地址schema.registry.url: http://192.168.200.128:8081

消息生产者

public void send(UserRecord record) {if (Objects.isNull(record)) {return;}log.info("send message, value:{}", record.toString());// 跟发送普通消息一样,可以直接发送UserRecordkafkaTemplate.send("demo-topic", record);
}

消息消费者

@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){// 跟接收普通消息一样,可以直接接收UserRecordlog.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}

完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Wireshark 4.4.2:安全更新、错误修复、更新协议支持

流行的网络协议分析器Wireshark已更新至4.4.2版本。它可用于网络故障排除、分析、开发和教育。 已修复以下漏洞&#xff1a; wnpa-sec-2024-14 FiveCo RAP 解剖器无限循环。wnpa-sec-2024-15 ECMP 解析器崩溃。 更新的协议支持&#xff1a; ARTNET、ASN.1 PER、BACapp、B…

《Django 5 By Example》阅读笔记:p339-p358

《Django 5 By Example》学习第12天&#xff0c;p339-p358总结&#xff0c;总计20页。 一、技术总结 1.项目(购物网站) django-admin startproject myshop 虽然这里只是示例&#xff0c;但我觉得这种命名为 myxxx 的习惯非常不好&#xff0c;因为在实际应用中&#xff0c;是…

【简单好抄保姆级教学】javascript调用本地exe程序(谷歌,edge,百度,主流浏览器都可以使用....)

javascript调用本地exe程序 详细操作步骤结果 详细操作步骤 在本地创建一个txt文件依次输入 1.指明所使用注册表编程器版本 Windows Registry Editor Version 5.00这是脚本的第一行&#xff0c;指明了所使用的注册表编辑器版本。这是必需的&#xff0c;以确保脚本能够被正确解…

Milvus 2.5:全文检索上线,标量过滤提速,易用性再突破!

01. 概览 我们很高兴为大家带来 Milvus 2.5 最新版本的介绍。 在 Milvus 2.5 里&#xff0c;最重要的一个更新是我们带来了“全新”的全文检索能力&#xff0c;之所以说“全新”主要是基于以下两点&#xff1a; 第一&#xff0c;对于全文检索基于的 BM25 算法&#xff0c;我们采…

【数据分析】布朗运动(维纳过程)

文章目录 一、概述二、数学布朗运动2.1 数学定义2.2 布朗运动的数学模型2.21 标准布朗运动2.22 布朗运动的路径2.23 布朗运动的方程 三、布朗运动在金融学中的应用四、数学构造&#xff08;以傅里叶级数为例&#xff09;4.1 傅里叶级数的基本思想4.2 构造布朗运动 一、概述 布…

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway)

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway) 一、服务网关 1.1 什么是网关 在微服务架构中&#xff0c;服务网关是一个至关重要的组件。它作为系统的入口&#xff0c;负责接收客户端的请求&#xff0c;并将这些请求路由到相应的后端服务…

即时通讯| IM+RTC在AI技术加持下的社交体验

即时通讯作为互联网的重要应用之一&#xff0c;见证了中国互联网30年发展的辉煌历程。 它从最初的文字交流&#xff0c;发展到如今的语音、视频通话&#xff0c;甚至是虚拟现实社交&#xff0c;已经渗透到生活的社交、娱乐、商务等方方面面&#xff0c;成为现代社会不可或缺的一…

【Java基础入门篇】一、变量、数据类型和运算符

Java基础入门篇 一、变量、数据类型和运算符 1.1 变量 计算机中的数据表示方式是&#xff1a;“二进制(0/1)”&#xff0c;但是同时也可以兼容其他进制&#xff0c;例如八进制、十进制、十六进制等。 Java变量的本质是&#xff1a;存储在固定空间的内容&#xff0c;变量名是…

【博主推荐】C#的winfrom应用中datagridview常见问题及解决方案汇总

文章目录 1.datagridview绘制出现鼠标悬浮数据变空白2.datagridview在每列前动态添加序号2.1 加载数据集完成后绘制序号2.2 RowPostPaint事件绘制 3.datagridview改变行样式4.datagridview后台修改指定列数据5.datagridview固定某个列宽6.datagridview某个列的显示隐藏7.datagr…

使用经典的Java,还是拥抱新兴的Rust?

在当代互联网时代的企业级开发中&#xff0c;技术栈的选择往往牵动着每个团队的神经。随着Rust语言的崛起&#xff0c;许多开发团队开始重新思考&#xff1a;是继续坚持使用经典的Java&#xff0c;还是拥抱新兴的Rust&#xff1f;这个问题背后&#xff0c;折射出的是对技术演进…

Java项目运行报错“java: -source 1.5 中不支持 diamond 运算符“解决办法windows/linux系统踩坑实录

文章目录 一、问题描述二、解决方案 一、问题描述 在接手同事的Java项目时&#xff0c;依赖和打包都能正常操作&#xff0c;但一点击运行项目&#xff0c;就会报错&#xff1a; java: -source 1.5 中不支持 diamond 运算符 (请使用 -source 7 或更高版本以启用 diamond 运算符…

SQL基础入门 —— SQL概述

目录 1. 什么是SQL及其应用场景 SQL的应用场景 2. SQL数据库与NoSQL数据库的区别 2.1 数据模型 2.2 查询语言 2.3 扩展性 2.4 一致性与事务 2.5 使用场景 2.6 性能与扩展性 总结 3. 常见的SQL数据库管理系统&#xff08;MySQL, PostgreSQL, SQLite等&#xff09; 3.…

开源项目:纯Python构建的中后台管理系统

来源&#xff1a;Python大数据分析 费弗里 大家好我是费老师&#xff0c;目前市面上有很多开源的「中后台管理系统」解决方案&#xff0c;复杂如「若依」那种前端基于Vue&#xff0c;后端基于Java的框架&#xff0c;虽然其提供了较为完善的一整套前后端分离权限管理系统解决方…

视频video鼠标移入移除展示隐藏(自定义控件)

效果图 代码 <template><div class"video-container" mouseover"showControls" mouseleave"hideControlsAfterDelay"><videoref"video"loadedmetadata"initializePlayer"timeupdate"updateProgress&qu…

【连接池】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…

ubuntu24.04安装Kubernetes1.31.0(k8s1.30.0)高可用集群

ubuntu24.04安装Kubernetes1.30.0(kubernetes1.30.0)高可用集群 一、总体概览 目前最新版的K8S版本应该是1.31.0,我们安装的是第二新的版本1.30.0,因为有大神XiaoHH Superme指路,所以基本上没踩坑,很顺利就搭建完成了。所有的机器都采用的最新版Ubuntu-Server-24.04长期支…

微软要求 Windows Insider 用户试用备受争议的召回功能

拥有搭载 Qualcomm Snapdragon 处理器的 Copilot PC 的 Windows Insider 计划参与者现在可以试用 Recall&#xff0c;这是一项臭名昭著的快照拍摄 AI 功能&#xff0c;在今年早些时候推出时受到了很多批评。 Windows 营销高级总监 Melissa Grant 上周表示&#xff1a;“我们听…

脉冲动画效果

js实现脉冲动画效果&#xff1a; 鼠标点击时&#xff0c;添加动画类&#xff0c;进而实现动画效果&#xff0c;鼠标离开时&#xff0c;移除动画类&#xff0c;回归静态图效果。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UT…

Linux—进程学习—04(进程地址空间学习)

目录 Linux—进程学习—41.程序地址空间1.1虚拟地址空间的现象1.2虚拟地址空间的理解(感性) 2.进程地址空间2.0 mm_struct结构体2.1 mm_struct结构体的源代码2.2分页&虚拟地址空间解释前面的实验现象 2.3进程地址空间存在的原因2.3.1第一个原因2.3.2第二个原因2.3.3第三个原…

图论入门编程

卡码网刷题链接&#xff1a;98. 所有可达路径 一、题目简述 二、编程demo 方法①邻接矩阵 from collections import defaultdict #简历邻接矩阵 def build_graph(): n, m map(int,input().split()) graph [[0 for _ in range(n1)] for _ in range(n1)]for _ in range(m): …