Kafka 的消息格式:了解消息结构与序列化

Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。

1. Kafka 消息结构

Kafka 的消息结构由消息头、消息键、消息值和时间戳等组成。下面是一个典型的 Kafka 消息结构:

----------------------------------------------------------------------------------------------
| Message Header | Key | Value | Timestamp | Optional Headers |
----------------------------------------------------------------------------------------------

1.1 消息头

消息头包含一些元数据信息,例如消息的大小、压缩信息等。消息头的结构可能会根据 Kafka 版本和配置而有所不同。

1.2 消息键与消息值

  • 消息键(Key): 用于标识消息的唯一性,通常用于分区和查找消息。

  • 消息值(Value): 包含实际的消息内容。

1.3 时间戳

时间戳表示消息的产生时间,有两种类型:

  • 创建时间戳: 表示消息被创建的时间。

  • LogAppendTime 时间戳: 表示消息被追加到日志的时间。

2. 消息的序列化与反序列化

Kafka 中的消息在生产者发送和消费者接收时需要进行序列化和反序列化。这是因为 Kafka 是以字节流的形式存储和传输消息的,而实际的消息内容可能是各种不同的数据类型。以下是一些常用的序列化器和反序列化器:

2.1 字符串序列化器

// 生产者端
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");// 消费者端
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

2.2 Avro 序列化器

Avro 是一种高性能且紧凑的二进制序列化格式,适用于复杂数据结构的消息。

// 生产者端
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("field1", "value1");
avroRecord.put("field2", 42);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my-topic", "key", avroRecord);// 消费者端
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {GenericRecord value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

2.3 JSON 序列化器

// 生产者端
JsonNode jsonNode = objectMapper.createObjectNode();
((ObjectNode) jsonNode).put("field1", "value1");
((ObjectNode) jsonNode).put("field2", 42);
ProducerRecord<String, JsonNode> record = new ProducerRecord<>("my-topic", "key", jsonNode);// 消费者端
ConsumerRecords<String, JsonNode> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {JsonNode value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

3. 自定义消息格式

在某些情况下,你可能需要定义自己的消息格式。Kafka 提供了 ByteArraySerializerByteArrayDeserializer,允许你将消息以字节数组的形式发送和接收,从而实现自定义的序列化和反序列化逻辑。

// 生产者端
byte[] customMessageBytes = serializeCustomMessage(customMessage);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("my-topic", "key", customMessageBytes);// 消费者端
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {byte[] value = record.value();CustomMessage customMessage = deserializeCustomMessage(value);System.out.printf("Consumed record with key %s and value %s%n", record.key(), customMessage);
});

4. 消息的压缩与解压

Kafka 支持消息的压缩,以减小网络传输的开销。以下是一些常用的压缩选项:

// 生产者端
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);// 消费者端
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
Consumer<String, String> consumer = new KafkaConsumer<>(props);

5. 消息的版本控制与兼容性

在实际应用中,系统的演进和变化是不可避免的。因此,考虑到消息的版本控制和兼容性是非常重要的。以下是一些相关的注意事项和最佳实践:

5.1 消息的演进

  • 向后兼容性: 新版本的消费者能够处理旧版本的消息。

  • 向前兼容性: 旧版本的消费者能够处理新版本的消息。

5.2 Schema Registry

Schema Registry 是一个用于存储和管理 Avro、JSON 等消息格式的架构的中心化服务。通过使用 Schema Registry,可以更好地管理消息的演进,并确保向前和向后的兼容性。

// 配置 Schema Registry 地址
props.put("schema.registry.url", "http://schema-registry:8081");

6. 消息的认证与加密

Kafka 提供了安全性特性,包括消息的认证和加密。以下是一些相关的配置选项:

6.1 SSL 加密通信

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

6.2 认证配置

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

7. 消息的追踪与监控

追踪和监控是保障系统稳定性和性能的重要手段。以下是一些常用的追踪和监控工具:

7.1 JMX 监控

Kafka 提供了 JMX 接口,可以通过 JConsole 或其他 JMX 客户端进行监控。

7.2 Kafka Manager

Kafka Manager 是一款开源的 Kafka 集群管理和监控工具,提供了直观的 Web 界面。

7.3 Prometheus 和 Grafana

使用 Prometheus 进行指标采集,结合 Grafana 进行可视化展示,可以更全面地监控 Kafka 集群的性能和健康状况。

总结

在深入探讨Kafka消息格式、版本控制、安全性和监控等关键主题后,对构建高效、灵活的消息系统有了更为全面的认识。了解消息结构、序列化与反序列化、自定义消息格式,以及消息的压缩与解压,是确保消息传递的基础。随后,版本控制与兼容性的重要性得到了强调,Schema Registry成为管理Avro、JSON等消息格式的利器。在保障消息传递安全方面,SSL加密通信和认证配置提供了可靠的手段。最后,通过JMX监控、Kafka Manager、以及Prometheus和Grafana的运用,能够实时追踪和监控Kafka集群的健康状态。

这篇文章旨在为大家提供全方位的Kafka消息系统知识,使其能够在实际应用中根据业务需求构建稳健、高效的消息处理系统。深入理解这些关键概念,将有助于确保消息系统的可维护性、稳定性和安全性,为实际业务场景中的挑战提供可行的解决方案。继续关注更多Kafka相关的技术内容,将使大家能够不断深化对消息系统的认识,应对日益复杂的数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203504.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段样题

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段 样题 第二阶段 模块二 网络安全事件响应、数字取证调查、应用程序安全 一、竞赛内容 Geek极安云科专注技能竞赛技术提升&#xff0c;基于各大赛项提供全面的系统性培训&#xff0c;拥有完整的培训体系。团队拥有曾…

docker部署elasticsearch8.x

docker部署elasticsearch8.x 提示1 注意版本差别1.1 docker修改配置1.1.2 docker使用vim报命令不存在的解决办法1.1.3 docker 容器内报错 E: List directory /var/lib/apt/lists/partial is missing. - Acquire ( : No such file or directory) 或者其他权限 PermissionError: …

【Delphi】一个函数实现ios,android震动功能 Vibrate(包括3D Touch 中 Peek 震动等)

一、前言 我们在开发移动端APP的时候&#xff0c;有时可能需要APP能够提供震动功能&#xff0c;以便提醒操作者&#xff0c;特别是ios提供的3D Touch触感功能&#xff0c;操作者操作时会有触感震动&#xff0c;给操作者的感觉很友好。那么&#xff0c;在Delphi的移动端FMX开发中…

团建策划信息展示服务预约小程序效果如何

团建是中大型企业商家每年举办的员工活动&#xff0c;其形式多样化、具备全部参与的娱乐性。但在实际策划流程及内容时&#xff0c;部分公司便会难以入手&#xff0c;术业有专攻&#xff0c;这个时候团建策划公司便会发挥效果。 如拓展训练、露营、运动会、体育竞技等往往更具…

【算法】算法题-20231207

这里写目录标题 一、共同路径二、数字列表排序三、给定两个整数 n 和 k&#xff0c;返回 1 … n 中所有可能的 k 个数的组合。 一、共同路径 给你一个完整文件名组成的列表&#xff0c;请编写一个函数&#xff0c;返回他们的共同目录路径。 # nums[/hogwarts/assets/style.cs…

算法通关村第十七关-黄金挑战跳跃问题

大家好我是苏麟 , 今天说说跳跃问题 . 跳跃游戏 描述 : 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff…

HBase-架构与设计

HBase架构与设计 一、背景二、HBase概述1.设计特点2.适用场景2.1 海量数据2.2 稀疏数据2.3 多版本数据2.4 半结构或者非结构化数据 三、数据模型1.RowKey2.Column Family3.TimeStamp 四、HBase架构图1.Client2.Zookeeper3.HMaster4.HRegionServer5.HRegion6.Store7.StoreFile8.…

Elasticsearch:什么是机器学习?

机器学习定义 机器学习 (ML) 是人工智能 (AI) 的一个分支&#xff0c;专注于使用数据和算法来模仿人类的学习方式&#xff0c;并随着时间的推移逐渐提高准确性。 计算机科学家和人工智能创新者 Arthur Samuel 在 20 世纪 50 年代首次将其定义为 “赋予计算机无需明确编程即可学…

【基于openGauss5.0.0简单使用DBMind】

基于openGauss5.0.0简单使用DBMind 一、环境说明二、初始化tpch测试数据三、使用DBMind索引推荐功能四、使用DBMind实现SQL优化功能 一、环境说明 虚拟机&#xff1a;virtualbox操作系统&#xff1a;openEuler 20.03 TLS数据库&#xff1a;openGauss-5.0.0DBMind&#xff1a;d…

2022年第十一届数学建模国际赛小美赛A题翼龙如何飞行解题全过程文档及程序

2022年第十一届数学建模国际赛小美赛 A题 翼龙如何飞行 原题再现&#xff1a; 翼龙是翼龙目中一个已灭绝的飞行爬行动物分支。它们存在于中生代的大部分时期&#xff1a;从三叠纪晚期到白垩纪末期。翼龙是已知最早进化出动力飞行的脊椎动物。它们的翅膀是由皮肤、肌肉和其他组…

云服务器与nas实现在冷热资源访问,nginx代理

在实际项目中&#xff0c;我们的文件存储是一个必不可少的环节&#xff0c;本博主了解到现在的存储方案有 购买纯系统的云服务器&#xff0c;自己安装个mino,再使用nginx代理给web使用购买OSS服务&#xff0c;现在有云厂商都有提供&#xff0c;储存价格也挺便宜的&#xff0c;…

13款趣味性不错(炫酷)的前端动画特效及源码(预览获取)分享(附源码)

文字激光打印特效 基于canvas实现的动画特效&#xff0c;你既可以设置初始的打印文字也可以在下方输入文字可实现激光字体打印&#xff0c;精简易用。 预览获取 核心代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8&q…

生物动力葡萄酒的快速指南

虽然我们大多数人都熟悉有机酿酒和农业&#xff0c;但围绕生物动力学仍有许多困惑和神秘。无论你是否完全陌生&#xff0c;或者你已经听到一些小道消息&#xff0c;我们在这里揭开这种独特的葡萄酒生产方法的神秘面纱。 生物动力葡萄酒就是一个更全面的有机酿酒过程&#xff0c…

Ros智行mini,opencv,Gmapping建图,自主导航auto_slam,人脸识别,语音控制

功能 一、Gmapping建图 二、自主导航 起始点 、终点 三、人脸识别 四、语音控制 完成任务: 机器人先建图 建完图后给出目标点&#xff0c;机器人就可以完成调用自主导航走到目标点&#xff0c;期间会调用激光雷达扫描局部环境来进行自主避障&#xff0c;到达终点后进行语音…

HCIP考试实验

实验更新中&#xff0c;部分配置解析与分析正在完善中........... 实验拓扑图 实验要求 要求 1、该拓扑为公司网络&#xff0c;其中包括公司总部、公司分部以及公司骨干网&#xff0c;不包含运营商公网部分。 2、设备名称均使用拓扑上名称改名&#xff0c;并且区分大小写。 3…

持续集成交付CICD:Jenkins使用GitLab共享库实现自动更新前后端项目质量配置

目录 一、实验 1.Jenkins使用GitLab共享库实现自动更新后端项目质量配置 2.Jenkins使用GitLab共享库实现自动更新前端项目质量配置 二、问题 1.Sonarqube如何添加自定义质量阈 一、实验 1.Jenkins使用GitLab共享库实现自动更新后端项目质量配置 (1)修改GitLab的Sonar.gr…

bert其他内容个人记录

Pre-training a seq2seq model BERT只是一个预训练Encoder&#xff0c;有没有办法预训练Seq2Seq模型的Decoder&#xff1f; 在一个transformer的模型中&#xff0c;将输入的序列损坏&#xff0c;然后Decoder输出句子被破坏前的结果&#xff0c;训练这个模型实际上是预训练一个…

【LeetCode刷题】-- 79.单词搜索

79.单词搜索 方法&#xff1a;使用回溯 使用dfs函数表示判断以网格的(i.j)位置出发&#xff0c;能否搜索到word(k)&#xff0c;其中word(k)表示字符串word从第k个字符开始的后缀子串&#xff0c;如果能搜索到&#xff0c;返回true,反之返回false 如果board[i][j]≠word[k]&am…

Netty线程模型

Netty线程模型 Netty中两个线程池, 分别是BossGroup和WorkGroup, 线程模型如下图所示&#xff1a; 模型解释&#xff1a; Netty 抽象出两组线程池BossGroup和WorkerGroup&#xff0c;BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写BossGroup和WorkerGr…

vue2 echarts饼状图,柱状图,折线图,简单封装以及使用

vue2 echarts饼状图&#xff0c;柱状图&#xff0c;折线图&#xff0c;简单封装以及使用 1. 直接上代码&#xff08;复制可直接用&#xff0c;请根据自己的文件修改引用地址&#xff0c;图表只是简单封装&#xff0c;可根据自身功能&#xff0c;进行进一步配置。&#xff09; …