flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurationsConfiguration configuration = new Configuration();ExecutionConfig executionConfig = new ExecutionConfig();CheckpointConfig checkpointConfig = new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");// Step 4: Add a map operator to transform the dataStreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph = streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster = new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);System.out.println("Job completed with result: " + result);// Step 12: Stop the MiniClusterminiCluster.close();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60982.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中&#xff0c;显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本&#xff0c;其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案&#xff0c;核心结构通常由光机、波导和耦合器组成。光机内的微型显示…

六:从五种架构风格推导出HTTP的REST架构

在分布式系统中,架构风格(Architectural Style)决定了系统组件如何交互、通信、存储和管理数据。每种架构风格都有其独特的特性和适用场景。本文将从五种典型的架构风格出发,逐步探讨它们如何影响了REST(Representational State Transfer,表述性状态转移)架构风格的设计…

星辰资讯 | TiDB v7.5.4 v8.4.0 发版

作者&#xff1a; ShawnYan 原文来源&#xff1a; https://tidb.net/blog/6e299751 TiDB 8.4.0 DMR 发版 11 月 11 日&#xff0c;TiDB 8.4.0 版本发布&#xff0c;以下是该版本的一些关键特性和改进&#xff1a; 性能 分区表全局索引成为正式功能 &#xff1a;提高检索…

Python 打包教程:从零开始构建可分发的Python包

Python 打包教程&#xff1a;从零开始构建可分发的Python包 引言 在Python开发中&#xff0c;打包是一个重要的环节。无论是共享代码、发布库还是部署应用&#xff0c;创建一个可分发的Python包都是必不可少的步骤。本文将详细介绍如何打包Python项目&#xff0c;涵盖从基础知…

大模型基础BERT——Transformers的双向编码器表示

大模型基础BERT——Transformers的双向编码器表示 整体概况 BERT&#xff1a;用于语言理解的深度双向Transform的预训练 论文题目&#xff1a;BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding Bidirectional Encoder Representations from…

Python教程笔记(2)

Python教程 5.1 列表详解5.1.3 列表推导式5.1.4 嵌套的列表推导式 5.7 深入条件控制6.1 模块详解6.3 dir()7.1 格式化字符串字面值7.1.3 手动格式化字符串 7.2 读写文件 5.1 列表详解 Python 中所有可变数据结构返回值为None。实现队列最好用 collections.deque。 5.1.3 列表…

DAHL:利用由跨越 29 个类别的 8,573 个问题组成的基准数据集,评估大型语言模型在生物医学领域长篇回答的事实准确性。

2024-11-14&#xff0c;由首尔国立大学创建的DAHL数据集&#xff0c;为评估大型语言模型&#xff08;LLMs&#xff09;在生物医学领域长文本生成中的幻觉问题提供了一个重要的工具&#xff0c;这对于提高模型的准确性和可靠性具有重要意义。 数据集地址&#xff1a;DAHL|生物医…

GStreamer 简明教程(九):Seek 与跳帧

系列文章目录 GStreamer 简明教程&#xff08;一&#xff09;&#xff1a;环境搭建&#xff0c;运行 Basic Tutorial 1 Hello world! GStreamer 简明教程&#xff08;二&#xff09;&#xff1a;基本概念介绍&#xff0c;Element 和 Pipeline GStreamer 简明教程&#xff08;三…

【微软:多模态基础模型】(1)从专家到通用助手

欢迎关注【youcans的AGI学习笔记】原创作品 【微软&#xff1a;多模态基础模型】&#xff08;1&#xff09;从专家到通用助手 【微软&#xff1a;多模态基础模型】&#xff08;2&#xff09;视觉理解 【微软&#xff1a;多模态基础模型】&#xff08;3&#xff09;视觉生成 【微…

GRE做题笔记(零散的个人经验)

locomotive机车By 1813, the Luddite resistance had all but vanished. all but表示“几乎完全”的程度&#xff0c;或者表示排除piston活塞attributed to 归因于how a sportsperson accounted for their own experience of stress 运动员如何解释自己的压力经历 &#xff0c;…

【蓝桥杯算法】Java的基础API

1. BigInteger 的使用 1.1. 判素数 package 模板;import java.math.BigInteger; import java.util.Scanner;public class 判素数 {static Scanner in new Scanner(System.in);public static void main(String[] args) {int q in.nextInt();while (q-- > 0) {BigInteger …

【项目实战】基于 LLaMA-Factory 通过 LoRA 微调 Qwen2

【项目实战】基于 LLaMAFactory 通过 LoRA 微调 Qwen2 一、项目介绍二、环境准备1、环境准备2、安装LLaMa-Factory3、准备模型数据集3.1 模型准备3.2 数据集准备 三、微调1、启动webui2、选择参数3、训练 四、测试五、总结 一、项目介绍 LLaMA-Factory是一个由北京航空航天大学…

数据仓库在大数据处理中的作用

数据仓库&#xff08;Data Warehouse&#xff0c;简称DW或DWH&#xff09;是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合&#xff0c;用于支持管理决策。以下是对数据仓库及其在大数据处理中作用的详细解释&#xff1a; 一、数据仓库的定义 面向主题&#x…

第23课-C++-红黑树的插入与旋转

&#x1f307;前言 红黑树是一种自平衡的二叉搜索树&#xff0c;因其出色的性能&#xff0c;广泛应用于实际中。Linux 内核中的 CFS 调度器便是一个使用红黑树的例子&#xff0c;这足以说明它的重要性。红黑树的实现通过红黑两种颜色的控制来维持平衡&#xff0c;并在必要时使…

基于 CentOS7.6 的 Docker 下载常用的容器(MySQLRedisMongoDB),解决拉取容器镜像失败问题

安装MySQL&Redis&MongoDB mysql选择是8版本&#xff0c;redis是选择4版本、mongoDB选择最新版&#xff0c;也可以根据自己的需要进行下载对应的版本&#xff0c;无非就是容器名:版本号 这样去拉去相关的容器镜像。如果你还不会在服务器中安装 docker&#xff0c;可以查…

C#/WinForm拖拽文件上传

一、首先创建一个上传文件的类&#xff0c;继承Control类&#xff0c;如下&#xff1a; public class UploadControl : Control{private Image _image;public UploadControl(){this.SetStyle(ControlStyles.UserPaint | //控件自行绘制&#xff0c;而不使用操作系统的绘制Cont…

ubuntu将firewall-config导出为.deb文件

firewall-config ubuntu是canonial 公司维护的&#xff0c;用wireshark测过&#xff0c;开机会给他们公司发遥测&#xff08;开了ufw阻塞所有连接也一样&#xff0c;canonial在里面把代码改了&#xff09;firewall-config是fedora(爱好者维护&#xff0c;公益版本)自带的防火墙…

蓝桥杯备考——算法

一、排序 冒泡排序、选择排序、插入排序、 快速排序、归并排序、桶排序 二、枚举 三、二分查找与二分答案 四、搜索&#xff08;DFS&#xff09; DFS&#xff08;DFS基础、回溯、剪枝、记忆化&#xff09; 1.DFS算法&#xff08;深度优先搜索算法&#xff09; 深度优先搜…

Javascript垃圾回收机制-运行机制(大厂内部培训版本)

前言 计算机基本组成&#xff1a; 我们编写的软件首先读取到内存&#xff0c;用于提供给 CPU 进行运算处理。 内存的读取和释放&#xff0c;决定了程序性能。 冯诺依曼结构 解释和编译 这两个概念怎么理解呢。 编译相当于事先已经完成了可以直接用。好比去饭店吃饭点完上…

python面向对象基础入门

面向对象 基本的实现方法大概如此 class Student(object):def __init__(self, name, score):self.name nameself.score scoredef print_score(self):print(%s: %s % (self.name, self.score))在面向对象的思想中&#xff0c;面对一个问题&#xff0c;首先应该考虑这个问题所…