flink使用StatementSet降低资源浪费

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/38303.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL影院订票系统

DDL -- 影院表 CREATE TABLE Theaters (TheaterID INT PRIMARY KEY,Name VARCHAR(100),Address VARCHAR(200) );-- 电影表 CREATE TABLE Movies (MovieID INT PRIMARY KEY,Title VARCHAR(100),ReleaseDate DATE );-- 放映场次表 CREATE TABLE Showings (ShowingID INT PRIMARY…

观察者模式在金融业务中的应用及其框架实现

引言 观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,使得多个观察者对象同时监听某一个主题对象。当这个主题对象发生变化时,会通知所有观察者对象,使它们能够自动更新。…

MISRA C

介绍 MISRA C 是由汽车产业软件可靠性协会(Motor Industry Software Reliability Association)提出的 C 语言编程标准,可提高嵌入式系统软件的安全性和可靠性。这些指南由汽车制造商、零部件供应商和工程咨询公司合作的汽车工业软件可靠性协…

基于STM32的智能农业环境监控系统

目录 引言环境准备智能农业环境监控系统基础代码实现:实现智能农业环境监控系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景:农业环境管理与优化问题解决方案与优化收尾与总结 1. 引言 智能农业环境监控系…

网络运维管理行业的痛点和难点

网络运维管理是确保企业网络稳定、高效运行的关键环节,然而在实际操作中,该行业面临着诸多痛点和难点。这些挑战不仅影响了运维工作的效率,还可能对企业的正常运营造成潜在威胁。本文将深入探讨网络运维管理行业的痛点和难点,以期…

博客一周年:回首与展望

博客一周年:回首与展望 前言 时光荏苒,转眼间我的博客已经陪伴我走过了整整一年的时光。在这一年里,我见证了博客从无到有的过程,也见证了它逐渐成长的点点滴滴。今天,我想借此机会回顾一下这一年的经历,…

ComfyUI高清放大的四种方式(工作流附件在最后)

方式一:Latent放大工作流 1.工作流截图 方式二:ESRGAN(传统模型)放大工作流 方式三:算法放大(后期处理)工作流 方式四:Ultimate SD Upscale工作流 这个方式的优势是对于显存底的用…

istitle()方法——判断首字母是否大写其他字母小写

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 istitle()方法用于判断字符串中所有的单词首字母是否为大写而其他字母为小写。istitle()方法的语法格式如下: str.istitle() …

2024深圳入户新规出炉!快速了解你的入户“绿色通道”

​朋友们,听说你们都在琢磨深圳户口这事儿?没买房的也想来凑凑热闹,其实,深圳户口不仅仅是为了买房,更多的是为了孩子教育、住房申请、医疗福利等等。各位想在深圳这片热土上大展拳脚的朋友们,现在好消息来…

Spring 动态增强逻辑执行分析

1、假如UserService中存在被增强的public 普通方法,那么spring ioc时就会创建对应的代理对象放置到容器中; 2、那么Controller中注入的userService就是代理对象; Service public class UserService {Transactionalpublic void f2(String us…

python open函数中文乱码怎么解决

首先在D盘下新建一个html文档,接着在里面输入含有中文的Html字符,使用中文格式对读取的字符进行解码,再用utf-8的模式对字符进行编码,然后就能正确输出中文字符。 代码如下: # -*- coding: UTF-8 -*- file1 open(&quo…

求职刷题力扣 DAY37 动态规划 part03 0-1背包问题

416. 分割等和子集 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集,使得两个子集的元素和相等。 示例 1: 输入:nums [1,5,11,5] 输出:true 解释:数组可以分割成 [1, 5, 5] …

5G NR PUSCH物理层过程

物理层过程 加扰 假设要在单个码字q上传输的bit块为 b ( q ) ( 0 ) , . . . , b ( q ) ( M b i t ( q ) − 1 ) b^{(q)}(0),...,b^{(q)}(M_{bit}^{(q)} - 1) b(q)(0),...,b(q)(Mbit(q)​−1) ,其中 M b i t ( q ) M_{bit}^{(q)} Mbit(q)​是总比特数,加…

架构师篇-9、从事件风暴到微服务设计的落地过程

用户付款功能第二个版本的设计实现 单一职责原则(SRP) 软件系统中的每个元素只完成自己职责内的事,将其他的事交给别人去做“职责”通常人理解为一个事情,与该事情相关的事都是它的责任 一个职责是软件变化的一个原因 第二次需求…

设计模式-状态模式和策略模式

1.状态模式 1.1定义 当一个对象的内在状态改变时允许根据当前状态作出不同的行为; 1.2 适用场景 (1)一个对象的行为取决于它的状态,并且它必须在运行时根据状态来决定其行为. (2)代码中包含了大量的与状态有关的条件语句,例如:一个操作含有庞大的多分值语句(if…

pytest测试框架pytest-random-order插件随机执行用例顺序

Pytest提供了丰富的插件来扩展其功能,本章介绍下pytest-random-order插件,随机设置pytest测试用例的运行顺序,并对随机性进行一些控制。 官方文档: https://pytest-cov.readthedocs.io/en/latest/index.html 适配版本说明&#x…

xlsx实现excel下载功能——js

import dayjs from "dayjs"; import * as XLSX from "xlsx"; import XLSXS from "xlsx-style";export function export2Excel(info: {// 文件名&#xff08;工作簿名&#xff09;title: string;// 数据data: any[] | Record<string, any>[…

Crontab命令详解:轻松驾驭Linux定时任务,提升系统效率

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 引言&#xff1a; crond是Linux系统中用来定期执行命令或指定程序任务的一种服务或软件…

AI agent是什么,什么技术栈

AI agent&#xff0c;也称为会话代理或聊天机器人&#xff0c; 是一种通过文本或语音模拟人类对话的计算机程序。 它们旨在以自然且引人入胜的方式理解和响应用户输入。 AI agent 被广泛用于各种应用中&#xff0c;包括客户服务、营销、 销售和教育。 有两种主要类型的 AI agen…

软件包管理简介

windows的软件包都不能直接在windows当中安装和使用&#xff0c;好处是windows中的大部分病毒和木马对于Linux都是不可识别的。坏处是所有的软件都必须对linux单独进行软件开发。 1.软件包分类 源码包 脚本安装包 绝大多数源代码包都是使用C语言来写的。 特征是开源。 二进…