kafka 发送文件二进制流及使用header发送附属信息

文章目录

  • 背景
  • 案例
    • 发送方
    • 接收方

背景

需要使用kafka发送文件二进制以及附属信息

案例

发送方

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class SendFileToKafka {public static void main(String[] args) {String filePath = "com/example/kafka/file/ConsumerFileByteArrayFromKafka.java";Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", "192.168.56.112:9092");kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);InputStream in = SendFileToKafka.class.getResourceAsStream("/com/example/kafka/file/ConsumerFileByteArrayFromKafka.java");try {byte[] buffer = new byte[in.available()];// 读到buffer字节数组中in.read(buffer);ProducerRecord<String, byte[]> record = new ProducerRecord<>("dataTopic", buffer);String header = "aaa";record.headers().add("test_header", header.getBytes(StandardCharsets.UTF_8));producer.send(record);in.close();producer.close();} catch (Exception e) {e.printStackTrace();}}
}

接收方

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;import java.util.Arrays;
import java.util.Properties;public class ConsumerFileByteArrayFromKafka {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.56.112:9092");props.put("group.id", "group1");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("dataTopic"));try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(100);for (ConsumerRecord<String, byte[]> record : records) {Headers headers = record.headers();Iterable<Header> testHeader = headers.headers("test_header");for (Header header : testHeader) {String recordHeader = new String(header.value(), "UTF-8");System.out.println("recordHeader => " + recordHeader);}byte[] message = record.value();System.out.println(new String(message));}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/21285.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Halo DB 魔法之 pg_pcpu_limit

↑ 关注「少安事务所」公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ 前情回顾 前面已经介绍了“光环”数据库的基本情况和安装办法&#xff0c;今天来介绍一个新话题。 哈喽&#xff0c;国产数据库&#xff01;Halo DB! 三步走&#xff0c;Halo DB 安装指引 ★ Ha…

Java Agent利器

一、JavaAgent技术 1.1 什么是JavaAgent JavaAgent是一种特殊的Java程序&#xff0c;是Instrumentation的客户端。它与普通Java程序通过main方法启动不同&#xff0c;JavaAgent并不是一个可以单独启动的程序&#xff0c;它必须依附在一个Java应用程序&#xff08;JVM&#xf…

java并发常见问题

1.死锁&#xff1a;当两个或多个线程无限期地等待对方释放锁时发生死锁。为了避免这种情况&#xff0c;你应该尽量减少锁定资源的时间&#xff0c;按顺序获取锁&#xff0c;并使用定时锁尝试。 2.竞态条件&#xff1a;当程序的行为依赖于线程的执行顺序或输入数据到达的顺序时…

Lagrange ZK Coprocessor:革新区块链领域的大数据应用

1. 引言 2024年5月11日&#xff0c;Lagrange Labs宣称获得由Founders Fund领投&#xff08;Archetype Ventures, 1kx, Maven11, Fenbushi Capital, Volt Capital, CMT Digital, Mantle Ecosystem Fund和其它天使投资人跟头&#xff09;的1320万美金种子轮融资&#xff0c;致力于…

springboot高校网上选课系统-计算机毕业设计源码85583

摘 要 本论文主要论述了如何使用JAVA语言开发一个高校网上选课系统&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述高校网上选课系统的当前背景以及系统开发的目…

typescript --object对象类型

ts中的object const obj new Object()Object 这里的Object是Object类型&#xff0c;而不是JavaScript内置的Object构造函数。 这里的Object是一种类型&#xff0c;而Object()构造函数表示一个值。 Object()构造函数的ts代码 interface ObjectConstructor{readonly prototyp…

C++20 范围(Range):简化集合操作

C20 范围&#xff1a;简化集合操作 一、范围&#xff08;Range&#xff09;的目的二、在模板函数中使用范围概念三、投影四、视图五、结论 一、范围&#xff08;Range&#xff09;的目的 在 C20 中&#xff0c;范围概念要求一个对象同时拥有迭代器和结束哨兵。这在标准集合的上…

YOLOv5改进(五)-- 轻量化模型MobileNetv3

文章目录 1、MobileNetV3论文2、代码实现2.1、MobileNetV3-small2.2、MobileNetV3-large 3、运行效果4、目标检测系列文章 1、MobileNetV3论文 Searching for MobileNetV3论文 MobileNetV3代码 MobileNetV3 是 Google 提出的一种轻量级神经网络结构&#xff0c;旨在在移动设备上…

官网上线,一款令人惊艳的文本转语音模型:ChatTTS

近日&#xff0c;一个名为 ChatTTS 文本转语音模型的项目在github上横空出世&#xff0c;一经推出便引发极大关注&#xff0c;短短四天时间&#xff0c;已经狂揽了14.2k的Start量。 ChatTTS是一款专为对话场景设计的支持中英文的文本转语音&#xff08;TTS&#xff09;模型&…

未来已来:Spring Boot引领数据库智能化革命

深入探讨了Spring Boot如何与现代数据库技术相结合&#xff0c;预测并塑造未来的数据访问趋势。本书不仅涵盖了Spring Data JPA的使用技巧&#xff0c;还介绍了云原生数据库的概念&#xff0c;微服务架构下的数据访问策略&#xff0c;以及AI在数据访问层的创新应用。旨在帮助开…

XFeat:速度精度远超superpoint的轻量级图像匹配算法

代码地址&#xff1a;https://github.com/verlab/accelerated_features?tabreadme-ov-file 论文地址&#xff1a;2404.19174 (arxiv.org) XFeat (Accelerated Features)重新审视了卷积神经网络中用于检测、提取和匹配局部特征的基本设计选择。该模型满足了对适用于资源有限设备…

在table中获取每一行scope的值

目的 当前有一份如下数据需要展示在表格中&#xff0c;表格的页面元素套了一个折叠面板&#xff0c;需要循环page_elements中的数据展示出来 错误实践 将template放在了折叠面板中&#xff0c;获取到的scope是空数组 <el-table-column label"页面元素" show-o…

【并发程序设计】15.信号灯(信号量)

15.信号灯(信号量) Linux中的信号灯即信号量是一种用于进程间同步或互斥的机制&#xff0c;它主要用于控制对共享资源的访问。 在Linux系统中&#xff0c;信号灯作为一种进程间通信&#xff08;IPC&#xff09;的方式&#xff0c;与其他如管道、FIFO或共享内存等IPC方式不同&…

分析和设计算法

目录 前言 循环不变式 n位二进制整数相加问题 RAM模型 使用RAM模型分析 代码的最坏情况和平均情况分析 插入排序最坏情况分析 插入排序平均情况分析 设计算法 分治法 总结 前言 循环迭代&#xff0c;分析算法和设计算法作为算法中的三个重要的角色&#xff0c;下面…

Java——二进制原码、反码和补码

一、简要介绍 原码、反码和补码只是三种二进制不同的表示形式&#xff0c;每个二进制数都有这三个形式。 1、原码 原码是将一个数的符号位和数值位分别表示的方法。 最高位为符号位&#xff0c;0表示正&#xff0c;1表示负&#xff0c;其余位表示数值的绝对值。 例如&…

如何解决游戏行业DDOS攻击问题

随着网络游戏行业的迅速发展&#xff0c;网络游戏问题也不可忽视&#xff0c;特别是目前网络攻击频发&#xff0c;DDoS攻击的简单化以及普及化&#xff0c;对游戏来说存在非常大的安全威胁。 随着受攻击对象的范围在不断地拓展&#xff0c;网络游戏这种这种新型并且有着丰厚利…

Scala编程基础3 数组、映射、元组、集合

Scala编程基础3 数组、映射、元组、集合 小白的Scala学习笔记 2024/5/23 14:20 文章目录 Scala编程基础3 数组、映射、元组、集合apply方法数组yield 数组的一些方法映射元组数据类型转换求和示例拉链集合flatMap方法 SetHashMap apply方法 可以new&#xff0c;也可以不new&am…

flink Jobmanager metaspace oom 分析

文章目录 现象作业背景分析现象分析类卸载条件MAT 分析 解决办法flink 官方提示 现象 通过flink 页面提交程序&#xff0c;多次提交后&#xff0c;jobmanager 报metaspace oom 作业背景 用户代码是flink 代码Spring nacos 分析 现象分析 从现象来看肯定是因为有的类没有被…

Linux系统-前台任务组,后台任务组

文章目录 前台进程后台进程新命令jobsfg 【后台进程组序号】ctrlz组合键信号 和 bg命令ctrlz组合键信号bg 【后台进程组序号】 session会话此时我们关闭本次的会话&#xff0c;我们的后台进程是否也会退出呢&#xff1f; 总结 前台进程 在我们远程登录Linux服务器后&#xff0…

创建__init__()方法

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 在创建类后&#xff0c;可以手动创建一个__init__()方法。该方法是一个特殊的方法&#xff0c;类似Java语言中的构造方法。每当创建一个类的新实例时…