Flink测试利器之DataGen初探 | 京东云技术团队

什么是 Flinksql

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如过滤、聚合、连接和转换等。它还提供了窗口操作、时间处理和复杂事件处理等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。他是Flink最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

使用 Flink SQL处理数据的基本步骤:

  1. 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。

  2. 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。

  3. 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。

  4. 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

什么是 connector

Flink Connector 是指用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  1. JDBC :用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。

  2. JDQ :用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。

  3. Elasticsearch :用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。

  4. File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。

还有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或将处理结果导出到外部系统。

DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (order_number BIGINT,price DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 `input_table` 的输入表。该表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四个字段。默认是random随机生成对应类型的数据,生产速率是10000条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}

支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

连接器属性

属性是否必填默认值类型描述
connectorrequired(none)String‘datagen’.
rows-per-secondoptional10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制。
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random还是sequence
fields.#.minoptional(Minimum value of type)(Type of field)random生成器 指定字段 # 最小值, 支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random生成器的指定字段 # 最大值, 支持数字类型
fields.#.lengthoptional100Integerchar/varchar/string/array/map/multiset 类型的长度.
fields.#.startoptional(none)(Type of field)sequence生成器的开始值
fields.#.endoptional(none)(Type of field)sequence生成器的结束值

DataGen使用

了解了dategen的基本使用方法,那么下面来结合其他类型的连接器实践一下吧。

场景1 生成一亿条数据到hive表

CREATE TABLE dataGenSourceTable(order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3))
WITH( 'connector'='datagen', 'number-of-rows'='100000000','rows-per-second' = '100000') ;CREATECATALOG myhive
WITH ('type'='hive','default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产10万条数据的时候,17分钟左右就可以完成,当然我们可以通过增加Flink任务的计算节点、并行度、提高生产速率’rows-per-second’的值等来更快速的完成大数据量的生产。

场景2 持续每秒生产10万条数到消息队列

CREATE TABLE dataGenSourceTable (order_number BIGINT,price INT,buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_array ARRAY < STRING >,col_map map < STRING, STRING >)
WITH( 'connector'='datagen', --连接器类型'rows-per-second'='100000', --生产速率'fields.order_number.kind'='random', --字段order_number的生产方式'fields.order_number.min'='1', --字段order_number最小值'fields.order_number.max'='1000', --字段order_number最大值'fields.price.kind'='sequence', --字段price的生产方式'fields.price.start'='1', --字段price开始值'fields.price.end'='1000', --字段price最大值'fields.col_array.element.length'='5', --每个元素的长度'fields.col_map.key.length'='5', --map key的长度'fields.col_map.value.length'='5' --map value的长度) ;
CREATE TABLE jdqsink1(order_number BIGINT,price DECIMAL(32, 2),buyer ROW< first_name STRING, last_name STRING >,order_time TIMESTAMP(3),col_ARRAY ARRAY < STRING >,col_map map < STRING, STRING >)
WITH('connector'='jdq','topic'='jrdw-fk-area_info__1','jdq.client.id'='xxxxx','jdq.password'='xxxxxxx','jdq.domain'='db.test.group.com','format'='json') ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;

思考

通过以上案例可以看到,通过Datagen结合其他连接器可以模拟各种场景的数据

  • 性能测试:我们可以利用Flink的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应;
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性;
  • 数据完整性测试:我们通过Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

作者:京东零售 石朴

来源:京东云开发者社区 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React 路由总结 react-router-dom6+react-router-dom5

开题 单页面应用和多页面应用 SPA&#xff1a;单页面应用程序&#xff0c;整个应用中只有一个页面(index.html) MPA&#xff1a;多页面应用程序&#xff0c;整个应用中有很多页面(*.html) react路由 现在的前端应用大多都是SPA单页面应用程序&#xff0c;也就是一个HTML页面的…

百度OCR识别图片文本字符串——物联网上位机软件

一、开发背景 根据项目需求&#xff0c;我们需要完成LED显示屏实时显示歌词的效果。最优的方法是调用歌曲播放器的API获取歌词&#xff0c;但是由于这个开发资格不是很好申请&#xff0c;因此我们采用其他方案&#xff0c;即通过OCR识别获取歌词&#xff0c;并投射到LED显示屏上…

ExposureDiffusion: Learning to Expose for Low-light Image Enhancement论文阅读笔记

南洋理工大学、鹏城实验室、香港理工大学在ICCV2023发表的暗图增强论文。用diffusion模型来进行raw图像暗图增强&#xff0c;同时提出了一个自适应的残差层用来对具有不同信噪比的不同区域采取不同的去噪策略。 方法的框图如下所示&#xff1a; 一张raw图片可以由信号和噪声…

Maven 基础教程系列

Maven是一个项目开发管理和理解工具。基于项目对象模型的概念&#xff1a;构建、依赖关系管理、文档创建、站点发布和分发发布都由pom.xml声明性文件控制。Maven可以通过插件进行扩展&#xff0c;以使用许多其他开发工具来报告或构建过程。 一、Maven 使用教程-CSDN博客 二、…

Eclipse Xtext 实现PLC ST 语言到C的转换

Eclipse Xtext 是开发领域专用语言&#xff08;DSL&#xff09;的工具。例如数据库的SQL 语言&#xff0c;PLC 的ST 语言都是一种领域专用语言。在开放自动化领域&#xff0c;提倡基于模型的设计方法。DSL 是描述模型的强有力工具。 在开发PLC 程序IDE时&#xff0c;开发ST编译…

网络安全内网渗透之信息收集--systeminfo查看电脑有无加域

systeminfo输出的内容很多&#xff0c;包括主机名、OS名称、OS版本、域信息、打的补丁程序等。 其中&#xff0c;查看电脑有无加域可以快速搜索&#xff1a; systeminfo|findstr "域:" 输出结果为WORKGROUP&#xff0c;可见该机器没有加域&#xff1a; systeminfo…

LeetCode【17】电话号码的字母组合

题目&#xff1a; 思路&#xff1a; 参考&#xff1a;https://blog.csdn.net/weixin_46429290/article/details/121888154 和上一个题《子集》的思路一样&#xff0c;先画出树结构&#xff0c;看树的深度&#xff08;遍历层级&#xff09;&#xff0c;树的宽度&#xff08;横向…

10.18作业

使用手动连接&#xff0c;将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中&#xff0c;在自定义的槽函数中调用关闭函数 将登录按钮使用qt5版本的连接到自定义的槽函数中&#xff0c;在槽函数中判断ui界面上输入的账号是否为"admin"&#xff0c;密码是否为…

C++11——包装器与lambda表达式

目录 一.背景 二.lambda 1.见一见lambda 2.lambda表达式语法 3.lambda捕捉列表说明 三.函数对象与lambda表达式 四.包装器 1.function包装器 2.包装类的成员函数 五.bind 1.调整参数位置 2.减少函数参数 一.背景 在C98中&#xff0c;如果想要对一个数据集合中的元素…

成都瀚网科技:如何有效运营抖店来客呢?

随着电子商务的快速发展和移动互联网的普及&#xff0c;越来越多的企业开始将目光转向线上销售渠道。其中&#xff0c;抖音成为备受关注的平台。作为中国最大的短视频社交平台之一&#xff0c;抖音每天吸引数亿用户&#xff0c;这也为企业提供了巨大的商机。那么&#xff0c;如…

F5.5G落进现实:目标网带来的光之路

数字化与智能化的世界将走向何方&#xff1f;这个问题有着非常复杂的答案&#xff0c;但其中有一个答案已经十分清晰。那就是智能化的下一步&#xff0c;必将走向泛在万兆的世界。 网络是算力联接的底座&#xff0c;是智能演化的基础。纵观每一代数字化升级&#xff0c;都可以发…

代码随想录Day22 LeetCode T39 组合总和 T40 组合总和II T131 分割回文串

LeetCode T39 组合总和 题目链接:39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 树形图 题目思路: 这我们会发现和昨天的题目很像,只是这里的元素并不是只能选取一次了,我们可以根据代码画出树形图来解决问题,下面我们开始递归三部曲 首先我们先定义出result和path数…

亲,手撸图文博文太累了?试试这个神器!

这一篇博客有关如何使用[InternLM-XComposer]来写图文并茂的博文。InternLM-XComposer是一个基于人工智能的创作工具&#xff0c;它可以根据你的输入生成不同类型的内容&#xff0c;例如文章、诗歌、歌词、代码等。你可以使用它来创作有趣和有创意的博客&#xff0c;同时也可以…

C# OpenCvSharp 利用Lab空间把春天的场景改为秋天

效果 项目 代码 using OpenCvSharp; using System; using System.Diagnostics; using System.Drawing; using System.Drawing.Imaging; using System.Windows.Forms;namespace OpenCvSharp_Demo {public partial class Form1 : Form{public Form1(){InitializeComponent();}st…

免费:实时 AI 编程助手 Amazon CodeWhisperer

点 &#xff0c;一起程序员弯道超车之路 现已正式推出实时 AI 编程助手 Amazon CodeWhisperer&#xff0c;包括 CodeWhisperer 个人套餐&#xff0c;所有开发人员均可免费使用。最初于去年推出的预览版 CodeWhisperer 让开发人员能够保持专注、高效&#xff0c;帮助他们快速、安…

如何管理前端状态?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

计算机网络中的CSMA/CD算法的操作流程(《自顶向下》里的提炼总结)

具有碰撞检测的载波侦听多路访问&#xff08;CSMA/CD算法&#xff09; 以下内容总结&#xff0c;对应《计算机网络自顶向下第七版》第六章链路层和局域网P299 操作流程&#xff1a; NIC&#xff08;适配器&#xff0c;即网络接口&#xff09;从网络层接收数据报&#xff0c;…

OneDrive打不开了,怎么办?使用管理员身份也无效,分享解决办法如下

文章目录 1、问题描述2、解决办法2.1 修改注册表信息2.2 修改本地组策略 1、问题描述 电脑自带的 OneDrive 突然打不开了&#xff0c;双击也没有任何反应&#xff0c;以管理员身份打开也不行。去看了好多资料才解决这个问题&#xff0c;现分享如下&#xff1b; 2、解决办法 …

用友GRP-U8 SQL注入漏洞复现

0x01 产品简介 用友GRP-U8R10行政事业财务管理软件是用友公司专注于国家电子政务事业&#xff0c;基于云计算技术所推出的新一代产品&#xff0c;是我国行政事业财务领域最专业的政府财务管理软件。 0x02 漏洞概述 用友GRP-U8的bx_historyDataCheck jsp、slbmbygr.jsp等接口存…

视频批量加水印:保护版权,提升效率

在当今的自媒体时代&#xff0c;视频制作已经成为许多人的一项必备技能。然而&#xff0c;在视频制作过程中&#xff0c;如何为自己的视频添加独特的水印以保护知识产权&#xff0c;常常让许多制作者感到困扰。本文将为你揭示如何通过固乔剪辑助手软件&#xff0c;简单几步批量…