flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurationsConfiguration configuration = new Configuration();ExecutionConfig executionConfig = new ExecutionConfig();CheckpointConfig checkpointConfig = new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");// Step 4: Add a map operator to transform the dataStreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph = streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster = new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);System.out.println("Job completed with result: " + result);// Step 12: Stop the MiniClusterminiCluster.close();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60982.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中&#xff0c;显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本&#xff0c;其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案&#xff0c;核心结构通常由光机、波导和耦合器组成。光机内的微型显示…

星辰资讯 | TiDB v7.5.4 v8.4.0 发版

作者&#xff1a; ShawnYan 原文来源&#xff1a; https://tidb.net/blog/6e299751 TiDB 8.4.0 DMR 发版 11 月 11 日&#xff0c;TiDB 8.4.0 版本发布&#xff0c;以下是该版本的一些关键特性和改进&#xff1a; 性能 分区表全局索引成为正式功能 &#xff1a;提高检索…

大模型基础BERT——Transformers的双向编码器表示

大模型基础BERT——Transformers的双向编码器表示 整体概况 BERT&#xff1a;用于语言理解的深度双向Transform的预训练 论文题目&#xff1a;BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding Bidirectional Encoder Representations from…

DAHL:利用由跨越 29 个类别的 8,573 个问题组成的基准数据集,评估大型语言模型在生物医学领域长篇回答的事实准确性。

2024-11-14&#xff0c;由首尔国立大学创建的DAHL数据集&#xff0c;为评估大型语言模型&#xff08;LLMs&#xff09;在生物医学领域长文本生成中的幻觉问题提供了一个重要的工具&#xff0c;这对于提高模型的准确性和可靠性具有重要意义。 数据集地址&#xff1a;DAHL|生物医…

【微软:多模态基础模型】(1)从专家到通用助手

欢迎关注【youcans的AGI学习笔记】原创作品 【微软&#xff1a;多模态基础模型】&#xff08;1&#xff09;从专家到通用助手 【微软&#xff1a;多模态基础模型】&#xff08;2&#xff09;视觉理解 【微软&#xff1a;多模态基础模型】&#xff08;3&#xff09;视觉生成 【微…

GRE做题笔记(零散的个人经验)

locomotive机车By 1813, the Luddite resistance had all but vanished. all but表示“几乎完全”的程度&#xff0c;或者表示排除piston活塞attributed to 归因于how a sportsperson accounted for their own experience of stress 运动员如何解释自己的压力经历 &#xff0c;…

【蓝桥杯算法】Java的基础API

1. BigInteger 的使用 1.1. 判素数 package 模板;import java.math.BigInteger; import java.util.Scanner;public class 判素数 {static Scanner in new Scanner(System.in);public static void main(String[] args) {int q in.nextInt();while (q-- > 0) {BigInteger …

【项目实战】基于 LLaMA-Factory 通过 LoRA 微调 Qwen2

【项目实战】基于 LLaMAFactory 通过 LoRA 微调 Qwen2 一、项目介绍二、环境准备1、环境准备2、安装LLaMa-Factory3、准备模型数据集3.1 模型准备3.2 数据集准备 三、微调1、启动webui2、选择参数3、训练 四、测试五、总结 一、项目介绍 LLaMA-Factory是一个由北京航空航天大学…

第23课-C++-红黑树的插入与旋转

&#x1f307;前言 红黑树是一种自平衡的二叉搜索树&#xff0c;因其出色的性能&#xff0c;广泛应用于实际中。Linux 内核中的 CFS 调度器便是一个使用红黑树的例子&#xff0c;这足以说明它的重要性。红黑树的实现通过红黑两种颜色的控制来维持平衡&#xff0c;并在必要时使…

基于 CentOS7.6 的 Docker 下载常用的容器(MySQLRedisMongoDB),解决拉取容器镜像失败问题

安装MySQL&Redis&MongoDB mysql选择是8版本&#xff0c;redis是选择4版本、mongoDB选择最新版&#xff0c;也可以根据自己的需要进行下载对应的版本&#xff0c;无非就是容器名:版本号 这样去拉去相关的容器镜像。如果你还不会在服务器中安装 docker&#xff0c;可以查…

C#/WinForm拖拽文件上传

一、首先创建一个上传文件的类&#xff0c;继承Control类&#xff0c;如下&#xff1a; public class UploadControl : Control{private Image _image;public UploadControl(){this.SetStyle(ControlStyles.UserPaint | //控件自行绘制&#xff0c;而不使用操作系统的绘制Cont…

ubuntu将firewall-config导出为.deb文件

firewall-config ubuntu是canonial 公司维护的&#xff0c;用wireshark测过&#xff0c;开机会给他们公司发遥测&#xff08;开了ufw阻塞所有连接也一样&#xff0c;canonial在里面把代码改了&#xff09;firewall-config是fedora(爱好者维护&#xff0c;公益版本)自带的防火墙…

蓝桥杯备考——算法

一、排序 冒泡排序、选择排序、插入排序、 快速排序、归并排序、桶排序 二、枚举 三、二分查找与二分答案 四、搜索&#xff08;DFS&#xff09; DFS&#xff08;DFS基础、回溯、剪枝、记忆化&#xff09; 1.DFS算法&#xff08;深度优先搜索算法&#xff09; 深度优先搜…

Javascript垃圾回收机制-运行机制(大厂内部培训版本)

前言 计算机基本组成&#xff1a; 我们编写的软件首先读取到内存&#xff0c;用于提供给 CPU 进行运算处理。 内存的读取和释放&#xff0c;决定了程序性能。 冯诺依曼结构 解释和编译 这两个概念怎么理解呢。 编译相当于事先已经完成了可以直接用。好比去饭店吃饭点完上…

ffmpeg+D3D实现的MFC音视频播放器,支持录像、截图、音视频播放、码流信息显示等功能

一、简介 本播放器是在vs2019 x86下开发&#xff0c;通过ffmpeg实现拉流解码功能&#xff0c;通过D3D实现视频的渲染功能。截图功能采用libjpeg实现&#xff0c;可以截取jpg图片&#xff0c;图片的默认保存路径是在C:\MYRecPath中。录像功能采用封装好的类Mp4Record实现&#x…

NodeJS 百度智能云文本转语音(实测)

现在文本转语音的技术已经非常完善了&#xff0c;尽管网络上有许多免费的工具&#xff0c;还是测试了专业的服务&#xff0c;选择了百度的TTS服务。 于是&#xff0c;在百度智能云注册和开通了文本转语音的服务&#xff0c;尝试使用NodeJS 实现文本转语音服务。但是百度的文档实…

信也科技和云杉网络的AI可观测性实践分享

1. 信也科技 2、云杉网络 2.1 中国移动

解析煤矿一张图

解析煤矿一张图 ​ 煤矿一张图是指通过数字化、智能化技术将煤矿的各项信息、数据和资源进行集中展示和管理&#xff0c;形成一个综合的可视化平台。这一平台将矿井的地理信息、设备状态、人员位置、安全生产、环境监测等信息整合成一个统一的“图形”&#xff0c;以便于管理者…

SpringBootTest常见错误解决

1.启动类所在包错误 问题 由于启动类所在包与需要自动注入的类的包不在一个包下&#xff1a; 启动类所在包&#xff1a; com.exmaple.test_02 但是对于需要注入的类却不在com.exmaple.test_02下或者其子包下&#xff0c;就会导致启动类无法扫描到该类&#xff0c;从而无法对…

Java 全栈知识体系

包含: Java 基础, Java 部分源码, JVM, Spring, Spring Boot, Spring Cloud, 数据库原理, MySQL, ElasticSearch, MongoDB, Docker, k8s, CI&CD, Linux, DevOps, 分布式, 中间件, 开发工具, Git, IDE, 源码阅读&#xff0c;读书笔记, 开源项目...