Flink1.14新版KafkaSource和KafkaSink实践使用(自定义反序列化器、Topic选择器、序列化器、分区器)

前言

在官方文档的描述中,API FlinkKafkaConsumer和FlinkKafkaProducer将在后续版本陆续弃用、移除,所以在未来生产中有版本升级的情况下,新API KafkaSource和KafkaSink还是有必要学会使用的。下面介绍下基于新API的一些自定义类以及主程序的简单实践。

官方案例

官方文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/

KafkaSource的自定义类

自定义反序列化器

自定义反序列化器可以以指定的格式取到来源Kafka消息中我们想要的元素。该类需要继承 KafkaDeserializationSchema ,这里简单将来源Kafka的topic、key、value以Tuple3[String, String, String]的格式取出来。

MyKafkaDeserializationSchemaTuple3.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecordimport java.nio.charset.StandardCharsets/*** @author hushhhh*/
class MyKafkaDeserializationSchemaTuple3 extends KafkaDeserializationSchema[(String, String, String)] {override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, String) = {new Tuple3[String, String, String](record.topic(),new String(record.key(), StandardCharsets.UTF_8),new String(record.value(), StandardCharsets.UTF_8))}override def isEndOfStream(nextElement: (String, String, String)): Boolean = falseoverride def getProducedType: TypeInformation[(String, String, String)] = {TypeInformation.of(classOf[(String, String, String)])}
}

KafkaSink的自定义类

自定义Topic选择器

自定义一个 TopicSelector 可以将流中多个topic里的数据根据一定逻辑分发到不同的目标topic里。该类需要继承 TopicSelector ,这里简单根据来源Kafka的topic名拼接下。

MyTopicSelector.scala

import org.apache.flink.connector.kafka.sink.TopicSelector/*** @author hushhhh*/
class MyTopicSelector extends TopicSelector[(String, String, String)] {override def apply(t: (String, String, String)): String = {// t: 来源kafka的topic、key、value"TOPIC_" + t._1.toUpperCase()}
}

自定义序列化器

自定义序列化器可以将数据根据自己的业务格式写到目标Kafka的key和value里,这里将来源Kafka里的key和value直接写出去,这两个类都需要继承 SerializationSchema 。

ProducerRecord Key的序列化器

MyKeySerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema/*** @author hushhhh*/
class MyKeySerializationSchema extends SerializationSchema[(String, String, String)] {override def serialize(element: (String, String, String)): Array[Byte] = {// element: 来源kafka的topic、key、valueelement._2.getBytes()}
}

ProducerRecord Value的序列化器

MyValueSerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema/*** @author hushhhh*/
class MyValueSerializationSchema extends SerializationSchema[(String, String, String)] {override def serialize(element: (String, String, String)): Array[Byte] = {// element: 来源kafka的topic、key、valueelement._3.getBytes()}
}

自定义分区器

自定义分区器可以根据具体逻辑对要写到目标Kafka 里的数据进行partition分配。该类需要继承 FlinkKafkaPartitioner ,这里根据key的hash分配到不同的partition里(如果目标topic有多个partition的话)。

MyPartitioner.scala

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner/*** @author hushhhh*/
class MyPartitioner extends FlinkKafkaPartitioner[(String, String, String)] {override def partition(record: (String, String, String), key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {// record: 来源kafka的topic、key、valueMath.abs(new String(record._2).hashCode % partitions.length)}
}

主类

Main.scala

import format.{MyKafkaDeserializationSchemaTuple3, MyKeySerializationSchema, MyPartitioner, MyTopicSelector, MyValueSerializationSchema}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.kafka.clients.consumer.OffsetResetStrategyimport java.util.Properties
import scala.collection.JavaConverters._/*** @author hushhhh*/
object Main {def main(args: Array[String]): Unit = {/*** env*/// stream环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment/*** source*/// 定义 KafkaSourcelazy val kafkaSource: KafkaSource[(String, String, String)] = KafkaSource.builder()// Kafka消费者的各种配置文件,此处省略配置.setProperties(new Properties())// 配置消费的一个或多个topic.setTopics("sourceTopic1,sourceTopic2,...".split(",", -1).toList.asJava)// 开始消费位置,从已提交的offset开始消费,没有的话从最新的消息开始消费.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))// 反序列化,使用之前我们自定义的反序列化器.setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchemaTuple3)).build()// 添加 kafka sourceval inputDS: DataStream[(String, String, String)] = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"MyKafkaSource").setParallelism(1)/*** transformation*/// 数据加工处理,此处省略/*** sink*/// 定义 KafkaSinklazy val kafkaSink: KafkaSink[(String, String, String)] =KafkaSink.builder[(String, String, String)]()// 目标集群地址.setBootstrapServers("bootstrap.servers")// Kafka生产者的各种配置文件,此处省略配置.setKafkaProducerConfig(new Properties())// 定义消息的序列化模式.setRecordSerializer(KafkaRecordSerializationSchema.builder()// Topic选择器,使用之前我们自定义的Topic选择器.setTopicSelector(new MyTopicSelector)// Key的序列化器,使用之前我们自定义的Key序列化器.setKeySerializationSchema(new MyKeySerializationSchema)// Value的序列化器,使用之前我们自定义的Value序列化器.setValueSerializationSchema(new MyValueSerializationSchema)// 自定义分区器,使用之前我们自定义的自定义分区器.setPartitioner(new MyPartitioner).build())// 语义保证,保证至少一次.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()// 添加 kafka sinkinputDS.sinkTo(kafkaSink).name("MyKafkaSink").setParallelism(1)/*** execute*/env.execute("myJob")}}

以上就是KafkaSource和KafkaSink API的简单使用。大佬们感觉有用的话点个赞吧~😉

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解析Excel文件内容,按每列首行元素名打印出某个字符串的统计占比(超详细)

1.示例: 开发需求:读取Excel文件,统计第3列到第5列中每列的"False"字段占比,统计第6列中的"Pass"字段占比,并按每列首行元素名打印出统计占比 1.1 实现代码1:列数为常量 请确保替换y…

测试access和trunk口的区别(华为)

思科设备参考:测试access和trunk口的区别(思科) 一,实验目的 实现同一 Vlan 内的主机互通,不同 Vlan 间的主机隔离。 二,配置前测试 PC1分别ping PC2、PC3、PC4都能通,因为四台PC默认同处于v…

一文掌握SpringBoot注解之@Configuration知识文集(2)

🏆作者简介,普修罗双战士,一直追求不断学习和成长,在技术的道路上持续探索和实践。 🏆多年互联网行业从业经验,历任核心研发工程师,项目技术负责人。 🎉欢迎 👍点赞✍评论…

【JAVA】单例模式的线程安全性

🍎个人博客:个人主页 🏆个人专栏:JAVA ⛳️ 功不唐捐,玉汝于成 目录 正文 我的其他博客 正文 老生常谈的问题了,首先要说的是单例模式的线程安全意味着:某个类的实例在多线程环境 下只会被…

main函数中参数argc和argv用法解析

1 基础 argc 是 argument count 的缩写,表示传入main函数的参数个数; argv 是 argument vector 的缩写,表示传入main函数的参数序列或指针,并且第一个参数argv[0]一定是程序的名称,并且包含了程序所在的完整路径&…

深度解读NVMe计算存储协议-2

近日,NVME协议组织为了解决这些性能问题并为供应商提供标准化机制,在其架构中集成优化的计算功能,开发了NVM Express (NVMe) 计算存储特性。 计算存储的核心特性包括两个命令集:计算程序集和子系统本地内存。 其中,计算…

python-分享篇-Turtle海龟-画图

文章目录 背景颜色画圆太阳花树椭圆 背景颜色 import turtlepen turtle.Turtle() turtle.Screen().bgcolor("blue") pen.color("cyan") for i in range(10):for i in range(2):pen.forward(100)pen.right(60)pen.forward(100)pen.right(120)pen.right(36…

供应商规模成倍增长,医疗器械制造商如何让采购效率更进一步|创新场景50...

ITValue 随着企业的快速发展,采购供应链网络日益庞大,企业在供应商管理上面临着管理体系分散、风险难以管控,采购过程环节多等问题,供应商内外协同亟待解决。 作者|秦聪慧 专题|创新场景50 ITValue 制造企业…

Node.js之内存限制理解_对处理前端打包内存溢出有所帮助

Node.js内存限制理解_对处理前端打包内存溢出有所帮助 文章目录 Node.js内存限制理解_对处理前端打包内存溢出有所帮助Node.js内存限制1. 查看Node.js默认内存限制1. Ndos.js_V20.10.02. Node.js_V18.16.0 2. V8引擎垃圾回收相关Heap organization堆组织 Node.js内存限制 默认情…

Lazysysadmin

信息收集 # nmap -sn 192.168.1.0/24 -oN live.port Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-30 21:10 CST Nmap scan report for 192.168.1.1 (192.168.1.1) Host is up (0.00075s latency). MAC Address: 00:50:56:C0:00:08 (VMware) Nma…

Docker容器化安装SonarQube9.9

文章目录 1.环境准备1.1 版本信息1.2 系统设置 2.Docker环境安装2.1 卸载旧版本2.2 设置源2.3 安装Docker2.4 设置阿里仓库2.5 启动Docker 3.Docker Compose4.登录4.1 首页4.2 安装插件 5.制作镜像离线安装 1.环境准备 1.1 版本信息 名称版本备注Docker25.0.1当前2024-01-01最…

《C程序设计》上机实验报告(五)之一维数组二维数组与字符数组

实验内容&#xff1a; 1.运行程序 #include <stdio.h> void main( ) { int i,j,iRow0,iCol0,m; int x[3][4]{{1,11,22,33},{2,28,98,38},{3,85,20,89}}; mx[0][0]; for(i0;i<3;i) for(j0;j<4;j) if (x[i][j]>m) { mx[i][j]; iRowi…

Elasticsearch:将文档级安全性 (DLS) 添加到你的内部知识搜索

作者&#xff1a;来自 Elastic Sean Story 你的企业很可能淹没在内部数据中。 你拥有问题跟踪、笔记记录、会议记录、维基页面、视频录制、聊天以及即时消息和私信。 并且不要忘记电子邮件&#xff01; 难怪如此多的企业都在尝试创造工作场所搜索体验 - 为员工提供集中、一站…

react 之 UseReducer

UseReducer作用: 让 React 管理多个相对关联的状态数据 import { useReducer } from react// 1. 定义reducer函数&#xff0c;根据不同的action返回不同的新状态 function reducer(state, action) {switch (action.type) {case INC:return state 1case DEC:return state - 1de…

【飞书小技巧】——飞书文档转 markdown 详细教程

飞书文档转 markdown 详细教程 基于项目:https://github.com/Wsine/feishu2md 如何使用 在线版 访问 https://feishu2md.onrender.com/ 粘贴文档链接即可&#xff0c;文档链接可以通过 分享 > 开启链接分享 > 复制链接 获得。 点击下载之后,会提示 Please wait. It ma…

2024/2/1学习记录

echarts 为柱条添加背景色&#xff1a; 若想设置折线图的点的样式&#xff0c;设置 series.itemStyle 指定填充颜色就好了&#xff0c;设置线的样式设置 lineStyle 就好了。 在折线图中倘若要设置空数据&#xff0c;用 - 表示即可&#xff0c;这对于其他系列的数据也是 适用的…

【C/C++】C/C++编程——整型(二)

在 C 中&#xff0c;整型数据可以分为有符号数&#xff08;Signed&#xff09;和无符号数&#xff08;Unsigned&#xff09;&#xff0c;这两种类型主要用于表示整数值&#xff0c;但它们在表示范围和用途方面有所不同。默认情况下&#xff0c;整数类型如 int、short、long 都是…

中科大计网学习记录笔记(三):接入网和物理媒体

前言&#xff1a; 学习视频&#xff1a;中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版&#xff0c;James F.Kurose&#xff0c;Keith W.Ross&#xff09;》课程 该视频是B站非常著名的计网学习视频&#xff0c;但相信很多朋友和我一样在听完前面的部分发现信…

基于tidevice实现iOS app自动化使用详解

目录 1、IOS自动化工具概述 2、tidevice工具的原理和使用 2.1、tidevice的原理 2.2、tidevice实现的功能 2.3、tidevice的安装 2.4、tidevice的使用 2.4.1、设备管理 1、查看已连接的设备的列表 2、检测设备连接状态 3、等待设备连接&#xff0c;只要有就连接就结束监…

2024-01-06-AI 大模型全栈工程师 - 如何训练百亿参数大模型

摘要 2024-01-06 周六 杭州 晴 本节内容: 讲座模式&#xff0c;学习大模型训练的相关流程。 课程内容 1. Transformer 回顾 2. 模型架构-生成式 3. 预训练数据的构建 4. 中文字典的构建 4. 预训练目标的构建 5. 预训练相关-预训练策略 6. 预训练相关-并行化训练 7. 预训练…