在Apache Flink中,获取执行环境可以通过调用ExecutionEnvironment类的静态方法来实现。以下是获取不同类型环境的示例代码:
本地环境(用于单机测试):
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
在 Apache Flink 中,ExecutionEnvironment 是程序执行的入口点,用于设置和执行 Flink 作业。ExecutionEnvironment.createLocalEnvironment() 方法用于创建一个本地执行环境,这意味着 Flink 作业将在你的本地 JVM 中运行,而不是在集群环境中。
下面是对 ExecutionEnvironment.createLocalEnvironment() 的详细解释:
- 创建本地环境:
- 当调用 ExecutionEnvironment.createLocalEnvironment() 时,实际上是在告诉 Flink:“我想在本地机器上运行这个 Flink 作业,而不是在集群上。”## 集群环境(用于生产环境):
- 这对于开发和测试非常有用,因为它允许在没有集群设置的情况下快速验证 Flink 作业。
- 使用示例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector; public class LocalFlinkExample { public static void main(String[] args) throws Exception { // 创建一个本地执行环境 ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); // 从一个集合中创建一个数据源 DataSet<String> text = env.fromElements("Hello", "World", "Flink"); // 转换数据 DataSet<Integer> lengths = text .map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return value.length(); } }); // 打印结果 lengths.print(); // 执行 Flink 作业 env.execute("Local Flink Example"); }
}
注意:
- 尽管本地环境对于开发和测试很有用,但在生产环境中,你应该使用集群执行环境(如 createRemoteEnvironment() 或使用 Flink 的命令行界面)。
- 本地环境可能不会完全模拟集群环境的行为,因此,在将作业部署到生产集群之前,最好在测试集群中对其进行验证。
- 使用本地环境时,要注意资源限制。由于作业在本地 JVM 中运行,因此可能会受到本地机器资源的限制。如果作业消耗的资源过多,可能会导致本地 JVM 崩溃或性能下降。
集群环境(生产环境):
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
在 Apache Flink 中,ExecutionEnvironment.getExecutionEnvironment() 是一个工厂方法,用于根据运行时的上下文自动选择适当的执行环境。具体来说,它会检测 Flink 程序是在集群环境中运行(如 YARN、Kubernetes、Mesos、Standalone 等),还是在本地环境中运行(如 IDE 或命令行工具),并据此返回一个合适的 ExecutionEnvironment 实例。
如果打算将 Flink 程序部署到集群环境中,那么使用 getExecutionEnvironment() 是一个好选择,因为它不需要显式地指定执行环境。这样,代码就可以在本地和集群环境中无缝切换,而无需修改。
以下是一个使用 getExecutionEnvironment() 的示例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; public class FlinkExample { public static void main(String[] args) throws Exception { // 获取执行环境,自动选择本地或集群环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从一个集合中创建一个数据源(这只是一个示例,实际中可能从外部数据源读取) DataSet<String> text = env.fromElements("Hello", "World", "Flink"); // 转换数据 DataSet<Integer> lengths = text .map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return value.length(); } }); // 打印结果到标准输出(如果运行在集群上,通常会将结果写入外部存储系统) lengths.print(); // 执行 Flink 作业 env.execute("Flink Example"); }
}
getExecutionEnvironment() 方法的工作原理如下:
- 自动检测执行环境:
- 如果 Flink 作业是作为集群作业提交的(例如,通过 Flink 的命令行界面或集群管理器),则该方法将返回一个与集群环境兼容的 ExecutionEnvironment。
- 如果 Flink 作业是在本地运行的(例如,在 IDE 中直接运行),则该方法将返回一个本地 ExecutionEnvironment。
注意:
- 使用 getExecutionEnvironment() 时,你不需要担心是在本地还是集群上运行作业,因为 Flink 会为你自动处理。
- 如果你明确知道你的作业将在本地运行,并且希望有更多的配置选项(例如,设置并行度),你可以直接使用 ExecutionEnvironment.createLocalEnvironment()。但是,如果你打算将作业部署到集群,并且希望代码能够在本地和集群之间无缝切换,那么 getExecutionEnvironment() 是一个更好的选择。
- 当你在 IDE 中开发 Flink 作业时,即使你使用 getExecutionEnvironment(),作业通常也会在本地运行,除非你配置了集群提交参数或使用了 Flink 的集群管理工具。
对于Flink Table API,获取表环境可以使用TableEnvironment类:
本地表环境:
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
在 Apache Flink 中,TableEnvironment 是用于批处理或流处理表格 API 的主要接口。自从 Flink 1.11 版本引入了对 Table API 和 SQL 的重大改进后,特别是 Blink 计划的引入,用户可以选择使用不同的计划器(planner)和模式(batch 或 streaming)。
这里做了以下几件事情:
- 创建 EnvironmentSettings:通过 EnvironmentSettings.newInstance() 开始配置 TableEnvironment 的设置。
- 使用 Blink 计划器:useBlinkPlanner() 指示 Flink 使用 Blink 计划器。Blink 计划器是基于 Apache Calcite 的一个定制版本,它为 Flink 的 Table API 和 SQL 提供了更多的优化和功能。例如,它支持更多的 SQL 功能和更复杂的查询。
- 指定批处理模式:inBatchMode() 表示 TableEnvironment 将被配置为批处理模式。在批处理模式下,TableEnvironment 将处理有限的数据集,通常用于批处理作业。
- 构建 TableEnvironment:通过调用 build() 方法完成 EnvironmentSettings 的配置,并将其传递给 TableEnvironment.create() 方法来创建一个新的 TableEnvironment 实例。
在得到 TableEnvironment 实例后,你可以注册外部数据源、执行 SQL 查询、转换表数据等。
这是一个简单的例子,展示了如何使用 TableEnvironment 来执行一个 SQL 查询:
// 假设你已经有了 tableEnv // 注册一个外部数据源(这里只是一个示例,具体实现取决于你的数据源)
tableEnv.executeSql("CREATE TABLE MyTable ( ... ) WITH ( ... )"); // 执行 SQL 查询
Table result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE someColumn = 'someValue'"); // 将结果转换为 DataSet 或 DataStream(取决于你的执行模式)
// 注意:这里假设你在批处理模式下,所以使用 toRetractStream 或 toDataSet 方法
DataSet<Row> dataSetResult = tableEnv.toDataSet(result, Row.class); // ... 进一步处理 dataSetResult ...
请注意,上面的代码只是一个示例,用于说明如何使用 TableEnvironment。在实际应用中,你需要根据你的数据源、执行模式和具体需求来配置和使用 TableEnvironment。
集群表环境:
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
在 Apache Flink 中,当您使用 TableEnvironment.create() 方法并指定 EnvironmentSettings 来创建一个 TableEnvironment 实例时,您可以选择使用 Blink planner 并设置执行模式为流处理模式(streaming mode)。这是处理实时数据流时常用的配置。
以下是各部分的解释:
- EnvironmentSettings.newInstance():创建一个新的 EnvironmentSettings 实例,用于配置 TableEnvironment。
- useBlinkPlanner():指定使用 Blink planner 作为查询优化器。Blink planner 提供了更丰富的 SQL 功能和更好的性能优化。
.inStreamingMode():将 TableEnvironment 的执行模式设置为流处理模式。这意味着您将在流处理上下文中执行 SQL 查询,即数据将被视为连续的数据流,并且查询将实时地处理这些数据。 - .build():根据提供的设置构建 EnvironmentSettings 实例。
- TableEnvironment.create(…):使用前面构建的 EnvironmentSettings 实例来创建一个新的 TableEnvironment 实例。
一旦您有了 TableEnvironment 实例,您就可以注册外部数据源、执行 SQL 查询、转换表数据等,并且这些操作都会以流处理的方式执行。
以下是一个简单的示例,展示了如何使用流处理模式下的 TableEnvironment 来执行一个 SQL 查询:
// 假设您已经有了 tableEnv // 注册一个外部数据源(这里只是一个示例,具体实现取决于您的数据源)
tableEnv.executeSql("CREATE TABLE MyTable ( ... ) WITH ( ... )"); // 执行 SQL 查询,这里假设 MyTable 是一个实时数据流
Table result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE someColumn = 'someValue'"); // 将结果转换为 DataStream(在流处理模式下)
DataStream<Row> dataStreamResult = tableEnv.toRetractStream(result, Row.class); // ... 进一步处理 dataStreamResult ...
请注意,在流处理模式下,您通常会将 Table 转换为 DataStream(使用 toRetractStream 或 toAppendStream 方法,取决于您的查询是否会产生更新或删除事件),因为 DataStream API 是为流处理而设计的。另外,对于只产生追加事件的查询,可以使用 toAppendStream 方法,这通常更简单且性能更好。
最后,请确保您的 Flink 集群和依赖库都支持流处理模式下的表 API 和 Blink planner。
请根据你的具体需求选择合适的环境。如果你需要流处理的环境,则应该使用StreamExecutionEnvironment。