kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义:


public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}

二、自定义序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}

三、定义生产者和消费者

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/59586.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【算法刷题之哈希表篇(1)】

目录 1.哈希表基础理论2.leetcode-242. 有效的字母异位词&#xff08;1&#xff09;方法一&#xff1a;排序&#xff08;2&#xff09;方法二&#xff1a;哈希表 3.leetcode-349. 两个数组的交集&#xff08;1&#xff09;方法一&#xff1a;哈希表&#xff08;2&#xff09;方…

会员管理系统实战开发教程06-会员充值

我们上篇讲解了会员开卡的操作&#xff0c;有了会员卡之后日常就是给会员进行充值&#xff0c;充值的逻辑是对余额进行累加&#xff0c;而且要记录充值的情况。 1 创建充值记录表 打开控制台&#xff0c;点击号创建数据源 输入数据源名称充值记录 点击编辑添加字段 先添加…

【C进阶】深度剖析数据在内存中的存储

目录 一、数据类型的介绍 1.类型的意义&#xff1a; 2.类型的基本分类 二、整形在内存中的存储 1.原码 反码 补码 2.大小端介绍 3.练习 三、浮点型在内存中的存储 1.一个例子 2.浮点数存储规则 一、数据类型的介绍 前面我们已经学习了基本的内置类型以及他们所占存储…

WebRTC之FEC前向纠错协议

FEC前向纠错用于丢包恢复&#xff0c;对媒体包进行异或或其他算法生成冗余包进行发送。如果接收端出现丢包&#xff0c;可以通过冗余包恢复出原始的媒体包。FEC的代价是增加码率带宽&#xff0c;所以一般会根据网络状况、丢包率来动态调整FEC冗余系数&#xff0c;也会结合NACK/…

无涯教程-Android - 环境设置

您可以从Oracle的Java网站下载最新版本的Java JDK-Java SE下载&#xff0c;您将在下载的文件中找到有关安装JDK的说明,按照给定的说明安装和配置安装程序。最后,将PATH和JAVA_HOME环境变量设置为引用包含 java 和 javac 的目录,通常分别是java_install_dir/bin和java_install_d…

D-Link DCS 密码泄露漏洞

0x01 前言 本次测试仅供学习使用&#xff0c;如若非法他用&#xff0c;与本文作者无关&#xff0c;需自行负责&#xff01;&#xff01;&#xff01; 0x02 漏洞描述 D-link DCS是一款成像色彩为彩色 是一款网络摄像机。D-link DCS系统存在密码泄露漏洞&#xff0c;攻击者通过…

C语言每日一练-----Day(4)

本专栏为c语言练习专栏&#xff0c;适合刚刚学完c语言的初学者。本专栏每天会不定时更新&#xff0c;通过每天练习&#xff0c;进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字&#xff1a;记负均正    旋转数组的最小数字    二分查找 &#x1f493;博主…

“精准时空”赋能制造业智能化发展

作者&#xff1a;邓中亮 高达动态厘米级的高精度定位服务&#xff0c;不仅是北斗卫星导航系统的一大独门绝技&#xff0c;其在产业化应用层面也已逐步向普适化、标配化演进&#xff0c;并延展出时空智能新兴产业。 5月17日&#xff0c;当长征三号乙运载火箭成功发射北斗系统的…

ubuntu触摸板自然滚动设置无效

natural scrolling does not work in gnome - Unix & Linux Stack Exchange settings->Mouse & Touchpad 里设置触摸板滚动方向为Natural无效&#xff0c;可能的原因是xserver-xorg-input-synaptics捕获了触摸板行为&#xff0c;把它xserver-xorg-input-synaptics …

Apache Spark 的基本概念和在大数据分析中的应用

Apache Spark是一个开源的大数据分析框架&#xff0c;可以快速高效地处理大规模的数据集。Spark具有以下特点&#xff1a; 快速性&#xff1a; Spark使用内存计算&#xff0c;能够在迭代算法、交互式数据挖掘和实时流处理等场景中表现出色。 灵活性&#xff1a; Spark支持多种…

Next.js基础语法

Next.js 目录结构 入口App组件&#xff08;_app.tsx&#xff09; _app.tsx是项目的入口组件&#xff0c;主要作用&#xff1a; 可以扩展自定义的布局&#xff08;Layout&#xff09;引入全局的样式文件引入Redux状态管理引入主题组件等等全局监听客户端路由的切换 ts.config…

驱动版本问题:connect error, url mysql.db.url:jdbc:mysql://IP地址:3306/数据库名

MySQL升级版本 5.7.43 必须升级驱动版本 8.0 以上版本 启动项目报错 使用Druid数据库连接池获取Mysql常见的错误--不兼容问题 后来百度搜索发现原来是Mysql版本和mysql-connector-java版本不一致造成的。 查询Mysql版本号&#xff1a; SELECT version(); 然后去官网找对应版…

iOS开发Swift-2-图片视图、App图标-赏月App

1.创建新项目 点击File - New - Project。 选择Single View App&#xff0c;点击Next。 填写文件信息&#xff0c;点击Next。 选择文件位置&#xff0c;点击Create。 修改App显示名称为 “赏月”。 2.设置背景色 选择Main&#xff0c;点击View界面&#xff0c;选择右边属性&…

用docker-compose搭建LNMP

docker-compose搭建LNMP 一、compose 的部署1.Docker Compose 环境安装 二、编写Docker Compose1.准备依赖文件,配置nginx2.配置mysql3.配置php4.编写docker-compose.yml5.执行6.查看 一、compose 的部署 &#xff08;1&#xff09;公司在实际的生产环境中&#xff0c;需要使用…

Cesium 加载 geojson 文件并对文件中的属性值进行颜色设置

文章目录 需求分析解决 需求 Cesium 加载 geojson 文件并对文件中的属性值进行颜色设置 分析 在搜寻多种解决方案后&#xff0c;最后总结出 自己的解决方案 方案一&#xff0c;没看懂 var geojsonOptions {clampToGround : true //使数据贴地};var entities;promise Cesium…

uniapp 支持图片放大

<view class"list" v-for"(item, index) in urls" :key"index"><image :src"item" click"viewImg(item, index)" disabled></image></view> js // 预览大图 viewImg(data, index) {uni.previewImag…

Java【手撕滑动窗口】LeetCode 209. “长度最小子数组“, 图文详解思路分析 + 代码

文章目录 前言一、长度最小子数组1, 题目2, 思路分析3, 代码 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链…

React入门 组件学习笔记

项目页面以组件形式层层搭起来&#xff0c;组件提高复用性&#xff0c;可维护性 目录 一、函数组件 二、类组件 三、 组件的事件绑定 四、获取事件对象 五、事件绑定传递额外参数 六、组件状态 初始化状态 读取状态 修改状态 七、组件-状态修改counter案例 八、this问…

如何安装chromadb

下载最新版本的python3.10 因为chromadb需要sqlite3的最小版本是3.35.0 使用如下命令安装 pip install chromadb 安装完毕后在python3的命令行窗口输入 import chromadb 如果不报错代表成功&#xff0c;如果报错sqlite3的最小版本是3.35.0&#xff0c;使用如下方式解决 …

校园照明控制系统的基本组成部分

1.1 主控中心 系统使用通用计算机作为主控中心&#xff0c;通过 RLINK 通讯装置与网络实现通 讯。计算机上可直接实现编程&#xff0c;监控&#xff0c;故障报警等功能。局域网上的计算机也可以 通过主控中心实现监控功能。 1.2 RLINK 通讯装置 主控中心与系统网络各单元的…