SpringBoot 整合 Avro 与 Kafka 详解

SpringBoot 整合 Avro 与 Kafka 详解

在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合,并通过 Spring Boot 进行整合,可以构建出高效、可扩展的实时数据处理系统。

一、环境准备

在开始整合之前,需要准备好以下环境:

  • Java:确保已经安装了 JDK,推荐使用 JDK 8 或更高版本。
  • Maven:用于管理项目的依赖和构建过程。
  • Spring Boot:作为项目的框架,推荐使用较新的版本,如 Spring Boot 2.x。
  • Kafka:确保 Kafka 已经安装并运行,可以使用 Docker 部署 Kafka 集群。
  • Avro:Avro 依赖 JSON 定义的架构来序列化数据。
二、项目结构

一个典型的 Spring Boot 项目结构可能如下:

spring-boot-kafka-avro
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           ├── SpringBootKafkaAvroApplication.java
│   │   │           ├── config
│   │   │           │   └── KafkaConfig.java
│   │   │           ├── producer
│   │   │           │   └── KafkaProducer.java
│   │   │           ├── consumer
│   │   │           │   └── KafkaConsumer.java
│   │   │           └── model
│   │   │               └── ElectronicsPackage.java (由 Avro 自动生成)
│   │   ├── resources
│   │   │   ├── application.properties
│   │   │   └── avro
│   │   │       └── electronicsPackage.avsc (Avro 架构文件)
│   └── test
│       └── java
│           └── com
│               └── example
│                   └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml
三、添加依赖

pom.xml 文件中添加必要的依赖:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.0</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro Maven Plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin>
</dependencies>
四、定义 Avro 架构

src/main/resources/avro/ 目录下创建一个 Avro 架构文件 electronicsPackage.avsc

{"namespace": "com.example.model","type": "record","name": "ElectronicsPackage","fields": [{"name": "package_number", "type": ["string", "null"], "default": null},{"name": "frs_site_code", "type": ["string", "null"], "default": null},{"name": "frs_site_code_type", "type": ["string", "null"], "default": null}]
}

这个架构文件定义了 ElectronicsPackage 类,包括三个字段:package_numberfrs_site_codefrs_site_code_type

五、生成 Avro 类

运行 Maven 构建过程,Avro Maven 插件会根据 electronicsPackage.avsc 文件生成相应的 Java 类 ElectronicsPackage.java

六、配置 Kafka

application.properties 文件中配置 Kafka 的相关属性:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.config.AvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.config.AvroDeserializer

注意,这里指定了自定义的 AvroSerializerAvroDeserializer 类。

七、实现 Avro 序列化器和反序列化器

创建 AvroSerializerAvroDeserializer 类,用于 Avro 数据的序列化和反序列化。

// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializer<T extends SpecificRecord> implements Serializer<T> {private final DatumWriter<T> writer;public AvroSerializer(Class<T> type) {this.writer = new SpecificDatumWriter<>(type);}@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}ByteArrayOutputStream out = new ByteArrayOutputStream();Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op}@Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializer<T extends SpecificRecord> implements Deserializer<T> {private final Class<T> type;private final DatumReader<T> reader;public AvroDeserializer(Class<T> type) {this.type = type;this.reader = new SpecificDatumReader<>(type);}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteArrayInputStream in = new ByteArrayInputStream(data);Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/63042.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2021数学分析【南昌大学】

2021 数学分析 求极限 lim ⁡ n → ∞ 1 n ( n + 1 ) ( n + 2 ) ⋯ ( n + n ) n \lim_{n \to \infty} \frac{1}{n} \sqrt [n]{(n+1)(n+2) \cdots (n+n)} n→∞lim​n1​n(n+1)(n+2)⋯(n+n) ​ lim ⁡ n → ∞ 1 n ( n + 1 ) ( n + 2 ) ⋯ ( n + n ) n = lim ⁡ n → ∞ ( n + …

vue+mars3d点击图层展示炫酷的popup弹窗

展示效果 目录 一&#xff1a;叠加数据图层到地图上&#xff0c;此时需要使用bindPopup绑定popup 二、封装自定义的popup&#xff0c;样式可以自行调整 一&#xff1a;叠加数据图层到地图上&#xff0c;此时需要使用bindPopup绑定popup 这里我根据数据不同&#xff0c;展示的…

【Python】用Python和Paramiko实现远程服务器自动化管理

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在现代IT环境中,远程服务器管理已成为运维工作的常态。随着自动化运维的需求不断增加,如何高效地管理远程服务器,提升操作的灵活性和效率…

框架建设实战2——创建frame-bom和out-bom

根frame-parent的创建一样&#xff0c;分别创建两个工程。主要管理公司内部的二方包和外部的三方包。 1.frame-bom GAV定义为&#xff1a; <groupId>com.test</groupId> <artifactId>frame-bom</artifactId> <name>frame-bom</name> …

【AIGC】如何使用高价值提示词Prompt提升ChatGPT响应质量

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: AIGC | 提示词Prompt应用实例 文章目录 &#x1f4af;前言&#x1f4af;提示词英文模板&#x1f4af;提示词中文解析1. 明确需求2. 建议额外角色3. 角色确认与修改4. 逐步完善提示5. 确定参考资料6. 生成和优化提示7.…

FPGA存在的意义:为什么adc连续采样需要fpga来做,而不会直接用iic来实现

FPGA存在的意义&#xff1a;为什么adc连续采样需要fpga来做&#xff0c;而不会直接用iic来实现 原因ADS111x连续采样实现连续采样功能说明iic读取adc的数据速率 VS adc连续采样的速率adc连续采样的速率iic读取adc的数据速率结论分析 FPGA读取adc数据问题一&#xff1a;读取adc数…

千益畅行,旅游卡有些什么优势?

千益畅行共享旅游卡是一种创新的旅游服务模式&#xff0c;旨在通过整合各类旅游资源&#xff0c;为用户提供一站式的旅游解决方案。这张旅游卡支持2至6人同行&#xff0c;涵盖了接机、酒店、用餐、大巴、导游、景区门票等服务&#xff0c;用户只需自行承担往返交通费用即可享受…

LobeChat-46.6k星!顶级AI工具集,一键部署,界面美观易用,ApiSmart 是你肉身体验学习LLM 最好IDEA 工具

LobeChat LobeChat的开源&#xff0c;把AI功能集合到一起&#xff0c;真的太爽了。 我第一次发现LobeChat的时候&#xff0c;就是看到那炫酷的页面&#xff0c;这么强的前端真的是在秀肌肉啊&#xff01; 看下它的官网&#xff0c;整个网站的动效简直闪瞎我&#xff01; GitH…

[报错] Error: PostCSS plugin autoprefixer requires PostCSS 8 问题解决办法

报错&#xff1a;Error: PostCSS plugin autoprefixer requires PostCSS 8 原因&#xff1a;autoprefixer版本过高 解决方案&#xff1a; 降低autoprefixer版本 执行&#xff1a;npm i postcss-loader autoprefixer8.0.0 参考&#xff1a; Error: PostCSS plugin autoprefix…

基于STM32的Wi-Fi无人机项目

引言 随着无人机技术的快速发展&#xff0c;基于微控制器的DIY无人机变得越来越流行。本项目将介绍如何使用STM32微控制器制作一架简单的Wi-Fi无人机。通过本项目&#xff0c;您将了解到无人机的基本组成部分&#xff0c;如何进行硬件连接&#xff0c;代码编写&#xff0c;以及…

IDEA注释格式、匹配补全调整

1.注释格式调整 目前重新捡起一部分Java&#xff0c;写代码时候发现注释快捷键总是放在第一列&#xff0c;看起来很难受&#xff0c;故寻找方法如下&#xff1a; 分别点击 编辑器-代码样式-Java 修改注释代码选项如下 2.大小写匹配补全问题 还发现在写代码过程中&#xff0c…

麒麟 V10(ky10.x86_64)无网环境下 openssl - 3.2.2 与 openssh - 9.8p1 升级【最全教程】

目录 背景 安装包下载 上传解压安装包 安装zlib 安装OpenSSL 安装OpenSSH 验证 背景 近期&#xff0c;项目上线已进入倒计时阶段&#xff0c;然而在至关重要的安全检查环节中&#xff0c;却惊现现有的 OpenSSH 存在一系列令人担忧的漏洞&#xff1a; OpenSSH 资源管理错…

Android EventBus最全面试题及参考答案

目录 什么是 EventBus? 请解释 EventBus 是什么,以及它的工作原理。 简述 EventBus 的工作原理。 EventBus 的主要组成部分有哪些? EventBus 是如何实现发布订阅模式的? EventBus 与观察者模式有什么区别? EventBus 的优点在哪、不用 EventBus 怎么解决? EventBu…

高级架构二 Git基础到高级

一 Git仓库的基本概念和流程 什么是版本库&#xff1f;版本库又名仓库&#xff0c;英文名repository,你可以简单的理解一个目录&#xff0c;这个目录里面的所有文件都可以被Git管理起来&#xff0c;每个文件的修改&#xff0c;删除&#xff0c;Git都能跟踪&#xff0c;以便任何…

从excel数据导入到sqlsever遇到的问题

1、格式问题时间格式&#xff0c;excel中将日期列改为日期未生效&#xff0c;改完后&#xff0c;必须手动单击这个单元格才能生效&#xff0c;那不可能一个一个去双击。解决方案如下 2、导入之后表字段格式问题&#xff0c;数据类型的用navicat导入之后默认是nvarchar类型的&a…

21天掌握javaweb--->第4天:MyBatis-Plus基础与进阶

21天掌握JavaWeb--->第4天&#xff1a;MyBatis-Plus基础与进阶 MyBatis-Plus简介 MyBatis-Plus&#xff08;简称MP&#xff09;是一个MyBatis的增强工具&#xff0c;在MyBatis的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。它具有以下核心优势&#xff…

FREERTOS二值信号量实验

代码&#xff1a; 主程序 #include "./SYSTEM/sys/sys.h" #include "./SYSTEM/usart/usart.h" #include "./SYSTEM/delay/delay.h" #include "./BSP/LED/led.h" #include "./BSP/LCD/lcd.h" #include "./BSP/KEY/key…

vue3中 axios 发送请求 刷新token 封装axios

service.js 页面 import axios from axios // 创建axios实例 const instance axios.create({baseURL: http://gcm-test.jhzhkj.cn:8600/h5card/,timeout: 5000, // 请求超时时间headers: {get: {Content-Type: application/x-www-form-urlencoded},post: {Content-Type: appl…

对于Oracle来说,土地管理是非核心域吗

思雨喵 2022-1-4 14:13 您在课上说&#xff0c;对于土地管理系统来说oracle&#xff0c;arcgis&#xff0c;java是非核心域&#xff0c;因为它们可有可无。我想请教对于oracle来说&#xff0c;土地管理好像也是可有可无&#xff0c;那么土地管理是非核心域吗 UMLChina潘加宇 …

一次Kafka启动失败引出的问题

背景 Some time&#xff0c;有个现场童鞋说咱的Kafka实例有个broker一直crash&#xff0c;还截图给我看了&#xff0c;大致是Kafka启动加载topic分区日志文件的时候&#xff0c;然后就没了&#xff0c;连个WARN都没有。当然&#xff0c;光看这个截图咱啥都不知道&#xff0c;因…