SpringBoot集成Kafka和avro和Schema注册表

Schema注册表

为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
在这里插入图片描述
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。

安装confluent

参考:安装手册

# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz

解压以后目录结构如下:

文件夹描述
bin可执行文件
etc配置文件
lib服务
libexec多平台的客户端库
sharejar包和license
src源码
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help

启动confluent服务

启动zookeeper

cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper

启动kafka

cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动./bin/kafka-server-start  -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092

启动confluent

cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081

新建springboot项目

新建avro的schema文件User.avsc

{"namespace": "com.github.xjs.protocol","type": "record","name": "UserRecord","fields": [{"name": "id","type": "int"},{"name": "name","type": "string"}]
}

pom中添加avro-maven-plugin插件

<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!-- 命令行执行:mvn generate-sources 把avsc转化成java文件 
-->
<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions>
</plugin>

添加avro和kafka的依赖

<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.7.1</version>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

添加对应的配置

server:port: 8080
spring:kafka:bootstrap-servers: 192.168.200.128:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer# 重点关注这里的KafkaAvroSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializerconsumer:group-id: testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 重点关注这里的.KafkaAvroDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:# confluent的地址schema.registry.url: http://192.168.200.128:8081

消息生产者

public void send(UserRecord record) {if (Objects.isNull(record)) {return;}log.info("send message, value:{}", record.toString());// 跟发送普通消息一样,可以直接发送UserRecordkafkaTemplate.send("demo-topic", record);
}

消息消费者

@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){// 跟接收普通消息一样,可以直接接收UserRecordlog.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}

完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nodemailer使用教程:在Node.js中发送电子邮件

目录 1. 简介 2. 安装 3. 基本配置 3.1 创建传输器 3.2 配置说明 4. 发送邮件 4.1 基本发送示例 4.2 发送验证码示例 5. 常见问题解决 5.1 "Greeting never received" 错误 5.2 安全建议 SMTP与邮件加密协议详解 1. SMTP简介 1.1 基本特点 2. 加密协…

Cause: java.sql.SQLException: No value specified for parameter 4

问题 执行更新sql时报错&#xff0c;异常栈如下 org.springframework.jdbc.BadSqlGrammarException: ### Error updating database. Cause: java.sql.SQLException: No value specified for parameter 4 ### The error may exist in com/my/mapper/MyMapper.java (best gue…

Wireshark 4.4.2:安全更新、错误修复、更新协议支持

流行的网络协议分析器Wireshark已更新至4.4.2版本。它可用于网络故障排除、分析、开发和教育。 已修复以下漏洞&#xff1a; wnpa-sec-2024-14 FiveCo RAP 解剖器无限循环。wnpa-sec-2024-15 ECMP 解析器崩溃。 更新的协议支持&#xff1a; ARTNET、ASN.1 PER、BACapp、B…

【一维DP】【三种解法】力扣983. 最低票价

在一个火车旅行很受欢迎的国度&#xff0c;你提前一年计划了一些火车旅行。在接下来的一年里&#xff0c;你要旅行的日子将以一个名为 days 的数组给出。每一项是一个从 1 到 365 的整数。 火车票有 三种不同的销售方式 &#xff1a; 一张 为期一天 的通行证售价为 costs[0] …

C语言中数据类型对应的打印格式

提示&#xff1a;文章 文章目录 前言一、背景二、C语言中数据类型对应的打印格式三、3.1 总结 前言 前期疑问&#xff1a; 本文目标&#xff1a; 一、背景 最近 二、C语言中数据类型对应的打印格式 int 对应 %d long 对应 %ld char 对应 %c float 对应 %f long long 对应 %…

《Django 5 By Example》阅读笔记:p339-p358

《Django 5 By Example》学习第12天&#xff0c;p339-p358总结&#xff0c;总计20页。 一、技术总结 1.项目(购物网站) django-admin startproject myshop 虽然这里只是示例&#xff0c;但我觉得这种命名为 myxxx 的习惯非常不好&#xff0c;因为在实际应用中&#xff0c;是…

Vue 项目开发常用知识点

一、基础语法与指令 1. 插值表达式 插值表达式是 Vue 中最基础的数据绑定方式&#xff0c;使用双大括号{{ }}将数据包裹起来&#xff0c;例如{{ message }}&#xff0c;它会将 Vue 实例中的message属性的值渲染到页面相应位置。这种方式可以方便地在页面中展示动态数据&#x…

【Web开发基础学习——corsheaders 应用的理解】

Web开发基础学习系列文章目录 第一章 基础知识学习之corsheaders 应用的理解 文章目录 Web开发基础学习系列文章目录前言一、使用1.1 安装1.2 配置 二、功能总结 前言 corsheaders 是一个 Django 第三方应用&#xff0c;用于处理跨域资源共享 (CORS)。CORS 是一种机制&#x…

【简单好抄保姆级教学】javascript调用本地exe程序(谷歌,edge,百度,主流浏览器都可以使用....)

javascript调用本地exe程序 详细操作步骤结果 详细操作步骤 在本地创建一个txt文件依次输入 1.指明所使用注册表编程器版本 Windows Registry Editor Version 5.00这是脚本的第一行&#xff0c;指明了所使用的注册表编辑器版本。这是必需的&#xff0c;以确保脚本能够被正确解…

Milvus 2.5:全文检索上线,标量过滤提速,易用性再突破!

01. 概览 我们很高兴为大家带来 Milvus 2.5 最新版本的介绍。 在 Milvus 2.5 里&#xff0c;最重要的一个更新是我们带来了“全新”的全文检索能力&#xff0c;之所以说“全新”主要是基于以下两点&#xff1a; 第一&#xff0c;对于全文检索基于的 BM25 算法&#xff0c;我们采…

【数据分析】布朗运动(维纳过程)

文章目录 一、概述二、数学布朗运动2.1 数学定义2.2 布朗运动的数学模型2.21 标准布朗运动2.22 布朗运动的路径2.23 布朗运动的方程 三、布朗运动在金融学中的应用四、数学构造&#xff08;以傅里叶级数为例&#xff09;4.1 傅里叶级数的基本思想4.2 构造布朗运动 一、概述 布…

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway)

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway) 一、服务网关 1.1 什么是网关 在微服务架构中&#xff0c;服务网关是一个至关重要的组件。它作为系统的入口&#xff0c;负责接收客户端的请求&#xff0c;并将这些请求路由到相应的后端服务…

即时通讯| IM+RTC在AI技术加持下的社交体验

即时通讯作为互联网的重要应用之一&#xff0c;见证了中国互联网30年发展的辉煌历程。 它从最初的文字交流&#xff0c;发展到如今的语音、视频通话&#xff0c;甚至是虚拟现实社交&#xff0c;已经渗透到生活的社交、娱乐、商务等方方面面&#xff0c;成为现代社会不可或缺的一…

【docker】docker常用命令汇总

1.Docker 常用命令总结表格 Docker 环境信息命令 命令作用docker version查看 Docker 客户端和服务端的版本信息docker info查看 Docker 系统的详细信息&#xff0c;包括存储驱动、内核版本等docker inspect <name or id>获取容器或镜像的详细配置信息docker system df…

【Java基础入门篇】一、变量、数据类型和运算符

Java基础入门篇 一、变量、数据类型和运算符 1.1 变量 计算机中的数据表示方式是&#xff1a;“二进制(0/1)”&#xff0c;但是同时也可以兼容其他进制&#xff0c;例如八进制、十进制、十六进制等。 Java变量的本质是&#xff1a;存储在固定空间的内容&#xff0c;变量名是…

鸿蒙技术分享:Navigation页面管理-鸿蒙@fw/router框架源码解析(二)

theme: smartblue 本文是系列文章&#xff0c;其他文章见&#xff1a;鸿蒙fw/router框架源码解析&#xff08;一&#xff09;-Router页面管理 鸿蒙fw/router框架源码解析 介绍 fw/router是在HarmonyOS鸿蒙系统中开发应用所使用的开源模块化路由框架。该路由框架基于模块化开…

【博主推荐】C#的winfrom应用中datagridview常见问题及解决方案汇总

文章目录 1.datagridview绘制出现鼠标悬浮数据变空白2.datagridview在每列前动态添加序号2.1 加载数据集完成后绘制序号2.2 RowPostPaint事件绘制 3.datagridview改变行样式4.datagridview后台修改指定列数据5.datagridview固定某个列宽6.datagridview某个列的显示隐藏7.datagr…

宠物领养平台构建:SpringBoot技术路线图

摘 要 如今社会上各行各业&#xff0c;都在用属于自己专用的软件来进行工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。互联网的发展&#xff0c;离不开一些新的技术&#xff0c;而新技术的产生往往是为了解决现有问题而产生的。针对于宠物领养…

使用经典的Java,还是拥抱新兴的Rust?

在当代互联网时代的企业级开发中&#xff0c;技术栈的选择往往牵动着每个团队的神经。随着Rust语言的崛起&#xff0c;许多开发团队开始重新思考&#xff1a;是继续坚持使用经典的Java&#xff0c;还是拥抱新兴的Rust&#xff1f;这个问题背后&#xff0c;折射出的是对技术演进…

springboot学习-jdbc,jdbc-client,spring-data-jdbc

晚上又看了Dan Vega的视频&#xff0c;讲解了jdbc在spring 的发展史。 jdbc: sql语句&#xff0c;手工梳理result&#xff0c;并循环。最原始的JAVA API&#xff0c;从1998年JAVA1.0就有了。jdbc template: sql语句&#xff0c;手工处理result ,不用循环了。--从spring诞生就有…