【大数据】Flink 测试利器:DataGen

Flink 测试利器:DataGen

  • 1.什么是 FlinkSQL ?
  • 2.什么是 Connector ?
  • 3.DataGen Connector
    • 3.1 Demo
    • 3.2 支持的类型
    • 3.3 连接器属性
  • 4.DataGen 使用案例
    • 4.1 场景一:生成一亿条数据到 Hive 表
    • 4.2 场景二:持续每秒生产 10 万条数到消息队列
  • 5.思考

1.什么是 FlinkSQL ?

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持 ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如 过滤聚合连接转换 等。它还提供了 窗口操作时间处理复杂事件处理 等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。它是 Flink 最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

在这里插入图片描述
使用 Flink SQL 处理数据的基本步骤:

  • 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。
  • 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。
  • 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。
  • 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink 会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过 Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

2.什么是 Connector ?

Flink Connector 是指 用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  • JDBC:用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。
  • JDQ:用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。
  • Elasticsearch:用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。
  • File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。
  • ……

还有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或 将处理结果导出到外部系统

在这里插入图片描述

3.DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

3.1 Demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (order_number BIGINT,price DECIMAL(32,2),buyer ROW <first_name STRING,last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 input_table 的输入表。该表包含了 order_numberpricebuyerorder_time 四个字段。默认是 Random 随机生成对应类型的数据,生产速率是 10000 10000 10000 条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{"order_number": -6353089831284155505,"price": 253422671148527900374700392448,"buyer": {"first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time": "2023-09-21 06:22:29.618"
}
{"order_number": 1102733628546646982,"price": 628524591222898424803263250432,"buyer": {"first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time": "2023-09-21 06:23:01.69"
}

3.2 支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

3.3 连接器属性

属性是否必填默认值类型
描述
connectorrequired(none)String‘datagen’
rows-per-secondoptional 10000 10000 10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random 还是 sequence
fields.#.minoptional(Minimum value of type)(Type of field)random 生成器的指定字段 # 最小值,支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random 生成器的指定字段 # 最大值,支持数字类型
fields.#.lengthoptional 100 100 100Integerchar / varchar / string / array / map / multiset 类型的长度
fields.#.startoptional(none)(Type of field)sequence 生成器的开始值
fields.#.endoptional(none)(Type of field)sequence 生成器的结束值

4.DataGen 使用案例

4.1 场景一:生成一亿条数据到 Hive 表

CREATE TABLE dataGenSourceTable (order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3)
) WITH ( 'connector'='datagen', 'number-of-rows'='100000000','rows-per-second' = '100000'
);CREATE CATALOG myhive
WITH ('type'='hive','default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (order_number BIGINT,price DECIMAL(10, 2),buyer STRING,order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产 10 万条数据的时候,17 分钟左右就可以完成,当然我们可以通过增加 Flink 任务的计算节点、并行度、提高生产速率 rows-per-second 的值等来更快速的完成大数据量的生产。

4.2 场景二:持续每秒生产 10 万条数到消息队列

CREATE TABLE dataGenSourceTable (order_number BIGINT,price INT,buyer ROW <first_name STRING,last_name STRING>,order_time TIMESTAMP(3),col_array ARRAY <STRING>,col_map map <STRING,STRING>
) WITH ( 'connector'='datagen', --连接器类型'rows-per-second'='100000', --生产速率'fields.order_number.kind'='random', --字段order_number的生产方式'fields.order_number.min'='1', --字段order_number最小值'fields.order_number.max'='1000', --字段order_number最大值'fields.price.kind'='sequence', --字段price的生产方式'fields.price.start'='1', --字段price开始值'fields.price.end'='1000', --字段price最大值'fields.col_array.element.length'='5', --每个元素的长度'fields.col_map.key.length'='5', --map key的长度'fields.col_map.value.length'='5' --map value的长度
);CREATE TABLE jdqsink1 (order_number BIGINT,price DECIMAL(32, 2),buyer ROW <first_name STRING,last_name STRING>,order_time TIMESTAMP(3),col_ARRAY ARRAY <STRING>,col_map map <STRING,STRING>
) WITH ('connector'='jdq','topic'='jrdw-fk-area_info__1','jdq.client.id'='xxxxx','jdq.password'='xxxxxxx','jdq.domain'='db.test.group.com','format'='json'
);INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;

5.思考

通过以上案例可以看到,通过 Datagen 结合其他连接器可以模拟各种场景的数据。

  • 性能测试:我们可以利用 Flink 的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应。
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性。
  • 数据完整性测试:我们通过 Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink 任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632537.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精选100 GPTs深度解析专题

精选100 GPTs深度解析专题 背景 1月10日&#xff0c;GPT应用商店&#xff08;GPT Store&#xff09;的正式上线&#xff0c;GPT技术的应用已经呈现爆炸性增长。目前&#xff0c;市场上已经出现了超过300万种GPTs&#xff0c;应用领域涵盖图像生成、写作、效率提升、研究分析、编…

【论文阅读】Latent Consistency Models (LDMs)、LCM-LoRa

文章目录 IntroductionPreliminariesDiffusion ModelsConsistency Models Latent Consistency ModelsConsistency Distillation in the Latent SpaceOne-Stage Guided Distillation by Solving Augmented PF-ODEAccelerating Distillation with Skipping Time StepsLatent Cons…

企业网盘的价值:为什么企业需要它?

企业网盘因其主打的文件管理协作功能&#xff0c;正好符合信息时代高速发展下企业的需要&#xff0c;能够帮助企业集中管理文件数据&#xff0c;提供便捷的文件协作服务&#xff0c;一跃成为近两年企业服务类产品榜单中的一匹黑马。 企业网盘真的这么好用吗&#xff1f;企业真…

Tessy—嵌入式软件单元测试/集成测试工具

产品概述 Tessy源自戴姆勒—奔驰公司的软件技术实验室&#xff0c;由德国Hitex公司负责销售及技术的支持服务&#xff0c;是一款专门针对嵌入式软件进行单元/集成测试的工具。它可以对C/C代码进行单元、集成测试&#xff0c;可以自动化搭建测试环境、执行测试、评估测试结果并生…

HarmonyOS—开发环境诊断的功能

为了大家开发应用/服务的良好体验&#xff0c;DevEco Studio提供了开发环境诊断的功能&#xff0c;帮助大家识别开发环境是否完备。可以在欢迎界面单击Help > Diagnose Development Environment进行诊断。如果已经打开了工程开发界面&#xff0c;也可以在菜单栏单击Help >…

YOLOv8全网首发:DCNv4更快收敛、更高速度、更高性能,效果秒杀DCNv3、DCNv2等 ,助力检测

💡💡💡本文独家改进:DCNv4更快收敛、更高速度、更高性能,完美和YOLOv8结合,助力涨点 DCNv4优势:(1) 去除空间聚合中的softmax归一化,以增强其动态性和表达能力;(2) 优化存储器访问以最小化冗余操作以加速。这些改进显著加快了收敛速度,并大幅提高了处理速度,DCN…

嵌入式学习二

目录 常见元器件&#xff1a; 要求&#xff1a; 理解基本原理和功能&#xff1a; 熟悉型号、特性和参数&#xff1a; 电阻器&#xff1a; 作用&#xff1a; 电阻器的分类&#xff1a; 基于功能和制作工艺划分&#xff1a; 电阻的数据使用手册 电容器&#xff1a; 电…

LabVIEW交变配流泵性能测试系统

利用LabVIEW软件与高级硬件结合&#xff0c;开发交变配流泵性能测试系统。该系统不仅提高了测试精度&#xff0c;还优化了工业自动化流程&#xff0c;代表了液压系统测试技术的进步。 开发了一种高精度的测试系统&#xff0c;该系统能够综合评估交变配流泵的性能&#xff0c;包…

JAVA基础----String类型的简单介绍

文章目录 1. String类的重要性2. 常用方法2.1 字符串构造2.2 String对象的比较2.3 字符串查找2.4 转化2.5 字符串替换2.6 字符串拆分2.7 字符串截取2.8 其他操作方法2.9 字符串的不可变性2.10 字符串修改2.11 借助StringBuffer 和 StringBuilder 观察String进行修改的效率 3. S…

性能监控工具nmon使用

性能监控工具nmon使用 nmon (Nigel’s Monitor)是由IBM 提供、免费监控 AIX 系统与 Linux 系统资源的工具。该工具可将服务器系统资源耗 用情况收集起来并输出一个特定的文件,并可利用 excel 分析工具&#xff08;nmon analyser&#xff09;进行数据的统计分析。 1.nmon安装 …

加解密算法整理(对称加密、非堆成加密、散列函数)

加解密算法是现代密码学核心技术&#xff0c;从设计理念和应用场景上可以分为两大基本类型&#xff0c;如下表所示。 算法类型特点优势缺陷代表算法对称加密加解密的密钥相同计算效率高&#xff0c;加密强度高需提前共享密钥&#xff0c;易泄露DES、3DES、AES、IDEA非对称加密…

DotNET 8 新特性 - AoT 编译、 MinimalAPI、Json源生成器

AoT编译方式特性 裁剪减小体积&#xff0c;取消JIT编译&#xff0c;不使用反射技术。直接产生目标机器二进制代码&#xff0c;目前支持x86&#xff0c;解决被反编译问题。 使用本机 AOT 发布的应用&#xff1a;最大程度减少了磁盘占用空间缩短了启动时间减少了内存需求 | 功能…

Unity中四元数常用的方法

单位四元数 #region 单位四元数print(Quaternion.identity);testObj.rotation Quaternion.identity;//初始化对象时可能会用来赋值Instantiate(testObj,Vector3.zero,Quaternion.identity);#endregion 插值运算 #region 插值运算 //四元数中也提供了如同Vector3的插值运算 /…

Linux学习记录——사십삼 高级IO(4)--- Epoll型服务器(1)

文章目录 1、理解Epoll和对应接口2、简单实现 1、理解Epoll和对应接口 poll依然需要OS去遍历所有fd。一个进程去多个特定的文件中等待&#xff0c;只要有一个就绪&#xff0c;就使用select/poll系统调用&#xff0c;让操作系统把所有文件遍历一遍&#xff0c;哪些就绪就加上哪…

SQL-窗口函数

什么是窗口函数 可以像聚合函数一样对一组数据进行分析并返回结果&#xff0c;二者的不同之处在于&#xff0c;窗口函数不是将一组数据汇总成单个结果&#xff0c;而是为每一行数据都返回一个结果。 窗口函数组成部分 1.创建数据分区 窗口函数OVER子句中的PARTITION BY选项用…

拼多多无货源中转仓项目真的靠谱吗?发展前景如何?

阿阳最近一直在关注无货源电商这一块&#xff0c;尤其是拼多多无货源中转仓&#xff0c; 现如今也有了自己的运营团队和交付团队&#xff0c;整体来看这个项目还算不错&#xff01; 说实话&#xff0c;在考察这个项目的时候&#xff0c;看到市面上很多人在做&#xff0c;包括我…

一、VTK 9.0.0 编译安装步骤 VS2019 CMake3.26.0

零基础开始学习VTK &#xff0c;请跟我进行第一步&#xff0c;配置好开放环境&#xff01; 首先&#xff0c;你时间比较紧急&#xff0c;想直接使用VTK &#xff0c;而无需编译、那么请使用 PCL-1.12.0-AllInOne-msvc2019-win64.exe 它已经帮你编译好VTK 9 了&#xff0c;直…

echarts柱状图顶部设置倾斜并且展示数字

将下面代码直接复制粘贴在此运行就能查看效果Apache ECharts&#xff0c;一款基于JavaScript的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。https://echarts.apache.org/examples/zh/editor.html?care…

AI制作《流浪地球3》高清宣传片

AI制作《流浪地球3》高清宣传片 星辰大海&#xff0c;再次启航&#xff0c;人类的冒险&#xff0c;永无止境。The vast expanse of stars and oceans, setting sail once again. Human adventure knows no bounds. 当家园变得遥不可及&#xff0c;我们唯有勇往直前。With our …

Docker部署Dubbo-Admin浏览器无法访问问题!

Dubbo配置开发环境 很多小伙伴在使用docker部署Dubbo-Admin会出现浏览器无法访问问题&#xff0c;但是虚拟机防火墙都是关着的&#xff0c;那么这可能是镜像源出现问题了&#xff0c;可以按照如下方法操作&#xff1a; 先将现有的镜像和容器全部删除&#xff08;配置完镜像源需…