kafka 发送文件二进制流及使用header发送附属信息

文章目录

  • 背景
  • 案例
    • 发送方
    • 接收方

背景

需要使用kafka发送文件二进制以及附属信息

案例

发送方

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class SendFileToKafka {public static void main(String[] args) {String filePath = "com/example/kafka/file/ConsumerFileByteArrayFromKafka.java";Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", "192.168.56.112:9092");kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);InputStream in = SendFileToKafka.class.getResourceAsStream("/com/example/kafka/file/ConsumerFileByteArrayFromKafka.java");try {byte[] buffer = new byte[in.available()];// 读到buffer字节数组中in.read(buffer);ProducerRecord<String, byte[]> record = new ProducerRecord<>("dataTopic", buffer);String header = "aaa";record.headers().add("test_header", header.getBytes(StandardCharsets.UTF_8));producer.send(record);in.close();producer.close();} catch (Exception e) {e.printStackTrace();}}
}

接收方

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;import java.util.Arrays;
import java.util.Properties;public class ConsumerFileByteArrayFromKafka {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.56.112:9092");props.put("group.id", "group1");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("dataTopic"));try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(100);for (ConsumerRecord<String, byte[]> record : records) {Headers headers = record.headers();Iterable<Header> testHeader = headers.headers("test_header");for (Header header : testHeader) {String recordHeader = new String(header.value(), "UTF-8");System.out.println("recordHeader => " + recordHeader);}byte[] message = record.value();System.out.println(new String(message));}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/21285.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Halo DB 魔法之 pg_pcpu_limit

↑ 关注「少安事务所」公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ 前情回顾 前面已经介绍了“光环”数据库的基本情况和安装办法&#xff0c;今天来介绍一个新话题。 哈喽&#xff0c;国产数据库&#xff01;Halo DB! 三步走&#xff0c;Halo DB 安装指引 ★ Ha…

Java Agent利器

一、JavaAgent技术 1.1 什么是JavaAgent JavaAgent是一种特殊的Java程序&#xff0c;是Instrumentation的客户端。它与普通Java程序通过main方法启动不同&#xff0c;JavaAgent并不是一个可以单独启动的程序&#xff0c;它必须依附在一个Java应用程序&#xff08;JVM&#xf…

java并发常见问题

1.死锁&#xff1a;当两个或多个线程无限期地等待对方释放锁时发生死锁。为了避免这种情况&#xff0c;你应该尽量减少锁定资源的时间&#xff0c;按顺序获取锁&#xff0c;并使用定时锁尝试。 2.竞态条件&#xff1a;当程序的行为依赖于线程的执行顺序或输入数据到达的顺序时…

Lagrange ZK Coprocessor:革新区块链领域的大数据应用

1. 引言 2024年5月11日&#xff0c;Lagrange Labs宣称获得由Founders Fund领投&#xff08;Archetype Ventures, 1kx, Maven11, Fenbushi Capital, Volt Capital, CMT Digital, Mantle Ecosystem Fund和其它天使投资人跟头&#xff09;的1320万美金种子轮融资&#xff0c;致力于…

springboot高校网上选课系统-计算机毕业设计源码85583

摘 要 本论文主要论述了如何使用JAVA语言开发一个高校网上选课系统&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述高校网上选课系统的当前背景以及系统开发的目…

typescript --object对象类型

ts中的object const obj new Object()Object 这里的Object是Object类型&#xff0c;而不是JavaScript内置的Object构造函数。 这里的Object是一种类型&#xff0c;而Object()构造函数表示一个值。 Object()构造函数的ts代码 interface ObjectConstructor{readonly prototyp…

实验报告6-SSM框架整合

资料下载链接 实验报告6-SSM框架整合(1验证码) 实验报告6-SSM框架整合(2管理员登录) 实验报告6-SSM框架整合(3商品的分页查询) 实验报告6-SSM框架整合(4权限拦截器) 一、需求分析 使用普通整合方式实现SSM&#xff08;SpringMVC、Spring和MyBatis&#xff09;整合&#xff0c;…

C++20 范围(Range):简化集合操作

C20 范围&#xff1a;简化集合操作 一、范围&#xff08;Range&#xff09;的目的二、在模板函数中使用范围概念三、投影四、视图五、结论 一、范围&#xff08;Range&#xff09;的目的 在 C20 中&#xff0c;范围概念要求一个对象同时拥有迭代器和结束哨兵。这在标准集合的上…

YOLOv5改进(五)-- 轻量化模型MobileNetv3

文章目录 1、MobileNetV3论文2、代码实现2.1、MobileNetV3-small2.2、MobileNetV3-large 3、运行效果4、目标检测系列文章 1、MobileNetV3论文 Searching for MobileNetV3论文 MobileNetV3代码 MobileNetV3 是 Google 提出的一种轻量级神经网络结构&#xff0c;旨在在移动设备上…

官网上线,一款令人惊艳的文本转语音模型:ChatTTS

近日&#xff0c;一个名为 ChatTTS 文本转语音模型的项目在github上横空出世&#xff0c;一经推出便引发极大关注&#xff0c;短短四天时间&#xff0c;已经狂揽了14.2k的Start量。 ChatTTS是一款专为对话场景设计的支持中英文的文本转语音&#xff08;TTS&#xff09;模型&…

Codeforces Global Round 17 C. Keshi Is Throwing a Party 题解 二分答案

Keshi Is Throwing a Party 题目描述 Keshi is throwing a party and he wants everybody in the party to be happy. He has n n n friends. His i i i-th friend has i i i dollars. If you invite the i i i-th friend to the party, he will be happy only if at m…

未来已来:Spring Boot引领数据库智能化革命

深入探讨了Spring Boot如何与现代数据库技术相结合&#xff0c;预测并塑造未来的数据访问趋势。本书不仅涵盖了Spring Data JPA的使用技巧&#xff0c;还介绍了云原生数据库的概念&#xff0c;微服务架构下的数据访问策略&#xff0c;以及AI在数据访问层的创新应用。旨在帮助开…

微信小程序如何进行页面跳转

微信小程序中的页面跳转可以通过多种方式实现&#xff0c;以下是几种主要的跳转方式及其详细解释&#xff1a; wx.navigateTo 功能&#xff1a;保留当前页面&#xff0c;跳转到应用内的某个页面。特点&#xff1a; 可以在新页面使用wx.navigateBack返回原页面。每跳转一个新页…

XFeat:速度精度远超superpoint的轻量级图像匹配算法

代码地址&#xff1a;https://github.com/verlab/accelerated_features?tabreadme-ov-file 论文地址&#xff1a;2404.19174 (arxiv.org) XFeat (Accelerated Features)重新审视了卷积神经网络中用于检测、提取和匹配局部特征的基本设计选择。该模型满足了对适用于资源有限设备…

在table中获取每一行scope的值

目的 当前有一份如下数据需要展示在表格中&#xff0c;表格的页面元素套了一个折叠面板&#xff0c;需要循环page_elements中的数据展示出来 错误实践 将template放在了折叠面板中&#xff0c;获取到的scope是空数组 <el-table-column label"页面元素" show-o…

【并发程序设计】15.信号灯(信号量)

15.信号灯(信号量) Linux中的信号灯即信号量是一种用于进程间同步或互斥的机制&#xff0c;它主要用于控制对共享资源的访问。 在Linux系统中&#xff0c;信号灯作为一种进程间通信&#xff08;IPC&#xff09;的方式&#xff0c;与其他如管道、FIFO或共享内存等IPC方式不同&…

分析和设计算法

目录 前言 循环不变式 n位二进制整数相加问题 RAM模型 使用RAM模型分析 代码的最坏情况和平均情况分析 插入排序最坏情况分析 插入排序平均情况分析 设计算法 分治法 总结 前言 循环迭代&#xff0c;分析算法和设计算法作为算法中的三个重要的角色&#xff0c;下面…

C/C++|基于回调函数实现异步操作

首先&#xff0c;要搞懂一点&#xff0c;异步操作本质上也是并发&#xff0c;我们想要在线程级别实现异步并发基本就靠三种方式&#xff1a; 多线程并发回调函数协程 今天我们讨论的是回调函数&#xff0c;我们如何通过回调函数来实现异步操作呢&#xff1f; 非阻塞I/O操作回…

Java——二进制原码、反码和补码

一、简要介绍 原码、反码和补码只是三种二进制不同的表示形式&#xff0c;每个二进制数都有这三个形式。 1、原码 原码是将一个数的符号位和数值位分别表示的方法。 最高位为符号位&#xff0c;0表示正&#xff0c;1表示负&#xff0c;其余位表示数值的绝对值。 例如&…

力扣刷题--LCP 66. 最小展台数量【简单】

题目描述&#x1f357; 力扣嘉年华将举办一系列展览活动&#xff0c;后勤部将负责为每场展览提供所需要的展台。 已知后勤部得到了一份需求清单&#xff0c;记录了近期展览所需要的展台类型&#xff0c; demand[i][j] 表示第 i 天展览时第 j 个展台的类型。 在满足每一天展台需…