flink ExecutionEnvironment

在Apache Flink中,获取执行环境可以通过调用ExecutionEnvironment类的静态方法来实现。以下是获取不同类型环境的示例代码:

本地环境(用于单机测试):

ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

在 Apache Flink 中,ExecutionEnvironment 是程序执行的入口点,用于设置和执行 Flink 作业。ExecutionEnvironment.createLocalEnvironment() 方法用于创建一个本地执行环境,这意味着 Flink 作业将在你的本地 JVM 中运行,而不是在集群环境中。

下面是对 ExecutionEnvironment.createLocalEnvironment() 的详细解释:

  1. 创建本地环境:
  • 当调用 ExecutionEnvironment.createLocalEnvironment() 时,实际上是在告诉 Flink:“我想在本地机器上运行这个 Flink 作业,而不是在集群上。”## 集群环境(用于生产环境):
  • 这对于开发和测试非常有用,因为它允许在没有集群设置的情况下快速验证 Flink 作业。
  1. 使用示例:
import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.api.java.DataSet;  
import org.apache.flink.api.java.ExecutionEnvironment;  
import org.apache.flink.util.Collector;  public class LocalFlinkExample {  public static void main(String[] args) throws Exception {  // 创建一个本地执行环境  ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();  // 从一个集合中创建一个数据源  DataSet<String> text = env.fromElements("Hello", "World", "Flink");  // 转换数据  DataSet<Integer> lengths = text  .map(new MapFunction<String, Integer>() {  @Override  public Integer map(String value) {  return value.length();  }  });  // 打印结果  lengths.print();  // 执行 Flink 作业  env.execute("Local Flink Example");  }  
}

注意:

  • 尽管本地环境对于开发和测试很有用,但在生产环境中,你应该使用集群执行环境(如 createRemoteEnvironment() 或使用 Flink 的命令行界面)。
  • 本地环境可能不会完全模拟集群环境的行为,因此,在将作业部署到生产集群之前,最好在测试集群中对其进行验证。
  • 使用本地环境时,要注意资源限制。由于作业在本地 JVM 中运行,因此可能会受到本地机器资源的限制。如果作业消耗的资源过多,可能会导致本地 JVM 崩溃或性能下降。

集群环境(生产环境):

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

在 Apache Flink 中,ExecutionEnvironment.getExecutionEnvironment() 是一个工厂方法,用于根据运行时的上下文自动选择适当的执行环境。具体来说,它会检测 Flink 程序是在集群环境中运行(如 YARN、Kubernetes、Mesos、Standalone 等),还是在本地环境中运行(如 IDE 或命令行工具),并据此返回一个合适的 ExecutionEnvironment 实例。

如果打算将 Flink 程序部署到集群环境中,那么使用 getExecutionEnvironment() 是一个好选择,因为它不需要显式地指定执行环境。这样,代码就可以在本地和集群环境中无缝切换,而无需修改。

以下是一个使用 getExecutionEnvironment() 的示例:

import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.api.java.DataSet;  
import org.apache.flink.api.java.ExecutionEnvironment;  public class FlinkExample {  public static void main(String[] args) throws Exception {  // 获取执行环境,自动选择本地或集群环境  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();  // 从一个集合中创建一个数据源(这只是一个示例,实际中可能从外部数据源读取)  DataSet<String> text = env.fromElements("Hello", "World", "Flink");  // 转换数据  DataSet<Integer> lengths = text  .map(new MapFunction<String, Integer>() {  @Override  public Integer map(String value) {  return value.length();  }  });  // 打印结果到标准输出(如果运行在集群上,通常会将结果写入外部存储系统)  lengths.print();  // 执行 Flink 作业  env.execute("Flink Example");  }  
}

getExecutionEnvironment() 方法的工作原理如下:

  1. 自动检测执行环境:
  • 如果 Flink 作业是作为集群作业提交的(例如,通过 Flink 的命令行界面或集群管理器),则该方法将返回一个与集群环境兼容的 ExecutionEnvironment。
  • 如果 Flink 作业是在本地运行的(例如,在 IDE 中直接运行),则该方法将返回一个本地 ExecutionEnvironment。

注意:

  • 使用 getExecutionEnvironment() 时,你不需要担心是在本地还是集群上运行作业,因为 Flink 会为你自动处理。
  • 如果你明确知道你的作业将在本地运行,并且希望有更多的配置选项(例如,设置并行度),你可以直接使用 ExecutionEnvironment.createLocalEnvironment()。但是,如果你打算将作业部署到集群,并且希望代码能够在本地和集群之间无缝切换,那么 getExecutionEnvironment() 是一个更好的选择。
  • 当你在 IDE 中开发 Flink 作业时,即使你使用 getExecutionEnvironment(),作业通常也会在本地运行,除非你配置了集群提交参数或使用了 Flink 的集群管理工具。

对于Flink Table API,获取表环境可以使用TableEnvironment类:

本地表环境:

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());

在 Apache Flink 中,TableEnvironment 是用于批处理或流处理表格 API 的主要接口。自从 Flink 1.11 版本引入了对 Table API 和 SQL 的重大改进后,特别是 Blink 计划的引入,用户可以选择使用不同的计划器(planner)和模式(batch 或 streaming)。
这里做了以下几件事情:

  1. 创建 EnvironmentSettings:通过 EnvironmentSettings.newInstance() 开始配置 TableEnvironment 的设置。
  2. 使用 Blink 计划器:useBlinkPlanner() 指示 Flink 使用 Blink 计划器。Blink 计划器是基于 Apache Calcite 的一个定制版本,它为 Flink 的 Table API 和 SQL 提供了更多的优化和功能。例如,它支持更多的 SQL 功能和更复杂的查询。
  3. 指定批处理模式:inBatchMode() 表示 TableEnvironment 将被配置为批处理模式。在批处理模式下,TableEnvironment 将处理有限的数据集,通常用于批处理作业。
  4. 构建 TableEnvironment:通过调用 build() 方法完成 EnvironmentSettings 的配置,并将其传递给 TableEnvironment.create() 方法来创建一个新的 TableEnvironment 实例。
    在得到 TableEnvironment 实例后,你可以注册外部数据源、执行 SQL 查询、转换表数据等。
    这是一个简单的例子,展示了如何使用 TableEnvironment 来执行一个 SQL 查询:
// 假设你已经有了 tableEnv  // 注册一个外部数据源(这里只是一个示例,具体实现取决于你的数据源)  
tableEnv.executeSql("CREATE TABLE MyTable ( ... ) WITH ( ... )");  // 执行 SQL 查询  
Table result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE someColumn = 'someValue'");  // 将结果转换为 DataSet 或 DataStream(取决于你的执行模式)  
// 注意:这里假设你在批处理模式下,所以使用 toRetractStream 或 toDataSet 方法  
DataSet<Row> dataSetResult = tableEnv.toDataSet(result, Row.class);  // ... 进一步处理 dataSetResult ...

请注意,上面的代码只是一个示例,用于说明如何使用 TableEnvironment。在实际应用中,你需要根据你的数据源、执行模式和具体需求来配置和使用 TableEnvironment。

集群表环境:

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

在 Apache Flink 中,当您使用 TableEnvironment.create() 方法并指定 EnvironmentSettings 来创建一个 TableEnvironment 实例时,您可以选择使用 Blink planner 并设置执行模式为流处理模式(streaming mode)。这是处理实时数据流时常用的配置。
以下是各部分的解释:

  • EnvironmentSettings.newInstance():创建一个新的 EnvironmentSettings 实例,用于配置 TableEnvironment。
  • useBlinkPlanner():指定使用 Blink planner 作为查询优化器。Blink planner 提供了更丰富的 SQL 功能和更好的性能优化。
    .inStreamingMode():将 TableEnvironment 的执行模式设置为流处理模式。这意味着您将在流处理上下文中执行 SQL 查询,即数据将被视为连续的数据流,并且查询将实时地处理这些数据。
  • .build():根据提供的设置构建 EnvironmentSettings 实例。
  • TableEnvironment.create(…):使用前面构建的 EnvironmentSettings 实例来创建一个新的 TableEnvironment 实例。

一旦您有了 TableEnvironment 实例,您就可以注册外部数据源、执行 SQL 查询、转换表数据等,并且这些操作都会以流处理的方式执行。
以下是一个简单的示例,展示了如何使用流处理模式下的 TableEnvironment 来执行一个 SQL 查询:

// 假设您已经有了 tableEnv  // 注册一个外部数据源(这里只是一个示例,具体实现取决于您的数据源)  
tableEnv.executeSql("CREATE TABLE MyTable ( ... ) WITH ( ... )");  // 执行 SQL 查询,这里假设 MyTable 是一个实时数据流  
Table result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE someColumn = 'someValue'");  // 将结果转换为 DataStream(在流处理模式下)  
DataStream<Row> dataStreamResult = tableEnv.toRetractStream(result, Row.class);  // ... 进一步处理 dataStreamResult ...

请注意,在流处理模式下,您通常会将 Table 转换为 DataStream(使用 toRetractStream 或 toAppendStream 方法,取决于您的查询是否会产生更新或删除事件),因为 DataStream API 是为流处理而设计的。另外,对于只产生追加事件的查询,可以使用 toAppendStream 方法,这通常更简单且性能更好。
最后,请确保您的 Flink 集群和依赖库都支持流处理模式下的表 API 和 Blink planner。

请根据你的具体需求选择合适的环境。如果你需要流处理的环境,则应该使用StreamExecutionEnvironment。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/16922.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yolov8+ROS+ubuntu18.04——学习记录

参考文献 1.Ubuntu配置Yolov8环境并训练自己的数据集 ROS实时运行 2.https://juejin.cn/post/7313979467965874214 前提&#xff1a; 1.CUDA和Anaconda&#xff0c;PyTorch 2.python>3.8 一、创建激活环境&#xff0c;安装依赖 1.创建虚拟环境 conda create -n yol…

Java项目:基于SSM框架实现的企业人事管理系统单位人事管理系统【ssm+B/S架构+源码+数据库+毕业论文】

一、项目简介 本项目是一套基于SSM框架实现的企业人事管理系统单位人事管理系统 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观…

【MySQL精通之路】全文搜索(3)-带查询扩展的全文搜索

博主PS&#xff1a;你可以把他理解为&#xff0c;查询猜测&#xff0c;膨胀查询&#xff0c;查询的第六感。 全文搜索支持查询扩展&#xff08;尤其是其变体“盲查询扩展”&#xff09;。 当搜索短语太短时&#xff0c;这通常很有用&#xff0c;这通常意味着用户依赖于全文搜索…

Divisibility Part1(整除理论1)

Divisibility Part1 学习本节的基础&#xff1a;任意个整数之间进行加、减、乘的混合运算之后的结果仍然是整数。之后将不申明地承认这句话的正确性并加以运用。 用一个不为 0 0 0的数去除另一个数所得的商却不一定是整数&#xff08; a a a除 b b b&#xff0c;写作 b a \frac…

基于云开发快速搭建智能名片小程序

熏风徐来&#xff0c;麦穗摇曳&#xff1b;麦类等夏熟作物生长旺盛&#xff0c;籽粒灌浆渐趋饱满&#xff0c;但尚未完全成熟&#xff0c;故称“小满”。 今日小满&#xff0c;基于云开发快速搭建智能名片小程序&#xff0c;发文以记录输入和输出过程。 一、功能总览&#xff…

数据结构(五)队列

文章目录 一、概念二、逻辑结构&#xff1a;线性结构三、存储结构&#xff08;一&#xff09;顺序队列&#xff08;二&#xff09;循环队列1. 结构体定义2. 创建队列&#xff08;1&#xff09;函数定义&#xff08;2&#xff09;注意点&#xff08;3&#xff09;代码实现 3. 入…

代码随想录算法训练营第二十天| 617. 合并二叉树、654. 最大二叉树、700. 二叉搜索树中的搜索、98. 验证二叉搜索树

[LeetCode] 617. 合并二叉树 [LeetCode] 617. 合并二叉树 文章解释 [LeetCode] 617. 合并二叉树 视频解释 题目: 给你两棵二叉树&#xff1a; root1 和 root2 。 想象一下&#xff0c;当你将其中一棵覆盖到另一棵之上时&#xff0c;两棵树上的一些节点将会重叠&#xff08;而另…

学习100个Unity Shader (18) --- 几何着色器(Geometry Shader)

文章目录 概述编写格式举例应用举例&#xff08;用预制体球的每个顶点画一个立方体&#xff09;参考 概述 vertex shader --> [geometry shader] --> fragment shader。[]: 可选阶段。输入图元 —> geometry shader —> 其他图元 编写格式 [maxcertexcount(N)] …

什么是访问越界(C语言数组、指针、结构体成员访问越界)

在C语言中&#xff0c;访问越界&#xff08;Access Violation 或 Out-of-Bounds Access&#xff09;是指程序试图访问的内存位置超出了其合法或已分配的范围。这通常发生在数组、指针或其他内存结构的使用中。 案例&#xff1a; #include <stdio.h>//数组 //Visiting b…

基于Django的美团药品数据分析与可视化系统,有多用户功能,可增删改查数据

背景 随着电子商务和健康产业的迅速发展&#xff0c;药品行业数据的分析和可视化变得愈发重要。基于Django的美团药品数据分析与可视化系统的研究背景凸显了对药品数据的深入挖掘和分析的需求。该系统不仅具备多用户功能&#xff0c;允许不同角色的用户进行数据管理和分析&…

python列表生成式的妙用:区间内奇数求和

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言 二、案例背景 三、实现步骤 四、案例验证 五、总结 一、引言 在Python编程中&a…

DuckDB 是个值得学习的系统

关于 DuckDB 的一些科普和评价&#xff1a; https://www.zhihu.com/question/438725169/answer/3255729583 DuckDB 的插件体系&#xff1a; https://duckdb.org/docs/extensions/overview.html 我认为&#xff0c;作为一个软件&#xff0c;引入”插件“是商业成功的必要条件…

Java三种方法实现多线程,继承Thread类,实现Runnable接口,实现Callable接口

目录 线程&#xff1a; 继承Thread类&#xff1a; 实现Runnable类&#xff1a; 实现Callable接口&#xff1a; 验证多线程&#xff1a; 线程&#xff1a; 定义&#xff1a;进程可以同时执行多个任务&#xff0c;每个任务就是线程。举个例子&#xff1a;一个Java程序&#…

力扣刷题--LCR 075. 数组的相对排序【简单】

题目描述 给定两个数组&#xff0c;arr1 和 arr2&#xff0c; arr2 中的元素各不相同 arr2 中的每个元素都出现在 arr1 中 对 arr1 中的元素进行排序&#xff0c;使 arr1 中项的相对顺序和 arr2 中的相对顺序相同。未在 arr2 中出现过的元素需要按照升序放在 arr1 的末尾。 …

实现UI显示在最上面的功能

同学们肯定遇到过UI被遮挡的情况&#xff0c;那如何让UI显示在最前面呢&#xff0c;先看效果 原理:UI的排序方式是和unityHierarchy窗口的层级顺序有关的&#xff0c;排序在下就越后显示&#xff0c;所以按照这个理论&#xff0c;当我们鼠标指到UI的时候把层级设置到最下层就好…

不一样的2024

当我们继续往前走时&#xff0c;发现身边的事物不再那么的陌生&#xff0c;也不再那边多的阻碍&#xff0c;不管怎么&#xff0c;2024将会不一样。 当我们走进审批流时&#xff0c;全面石荒芜的&#xff0c;所以自己构建了一个体系。 当我们转向开源时&#xff0c;发现开源与…

nacos(一) 安装

一 nacos 1.4.7安装 安装 nacos-server nacos官方下载 说明&#xff1a; 下载1.4.7和2.3.2版本,本专栏后续以1.4.7为例进行讲解补充&#xff1a; nacos-server服务端和nacos-client客户端附加&#xff1a; spring 版本、nacos-server、nacos-client版本要适配思考&#xf…

【Redis】Widows 和 Linux 下使用 Redis

Redis 简述 1.缓存 缓存就是将数据存放在距离计算最近的位置以加快处理速度。缓存是改善软件性能的第一手段,现代 CPU 越来越快的一个重要因素就是使用了更多的缓存,在复杂的软件设计中,缓存几乎无处不在。大型网站架构设计在很多方面都使用了缓存设计。 2.Redis Redis …

同元软控专业模型库系列—电气篇

一、引言 电气作为研究电能产生、传输、分配、使用和控制的专业领域&#xff0c;在航空航天、能源电力、船舶推进、轨道交通等众多行业中占据着举足轻重的地位&#xff0c;应用范围涉及电力工程、电子通信、自动化控制等&#xff0c;如电池充电管理芯片设计、航天器伺服系统、…

GitHub Copilot如何订阅使用

1.Copilot是什么 Copilot是由Github和OpenAI联合开发的一个基于人工智能大模型的代码写作工具。 我们都知道Github是世界上拥有开源项目及代码最多的一个平台&#xff0c;有了这么一个得天独厚的资源&#xff0c;Github联合OpenAI喂出了Copilot。经过不断地更新迭代&#xff…