1、获取配置参数-1
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.IOException;
import java.util.Map;public class _01_ParameterToolReadArgs {public static void main(String[] args) throws IOException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置值来自 .properties 文件
// String propertiesFilePath = "*";
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesFilePath);// File file = new File("*");
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(file);// FileInputStream fileInputStream = new FileInputStream(new File("*"));
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileInputStream);// 配置值来自命令行// 输入// --input hdfs:///mydata// --elements 42// 输出// input=hdfs:///mydata// elements=42
// ParameterTool parameterTool = ParameterTool.fromArgs(args);// 配置值来自系统属性,VmOptions// 输入// -Dinput=hdfs:///mydata// 输出// input=hdfs:///mydataParameterTool parameterTool = ParameterTool.fromSystemProperties();for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {if ("input".equals(entry.getKey())) {System.out.println(entry.getKey() + "=" + entry.getValue());}}}
}
2、获取配置参数-2
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Map;/*** 输入 --input myGlobalParamsInput* 输出 myGlobalParamsInput*/
public class _02_ParameterToolGlobalParams {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameters = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(parameters);DataStreamSource<Integer> source = env.fromData(1, 2, 3);source.map(new MyRichMapFunc()).print();env.execute();}
}class MyRichMapFunc extends RichMapFunction<Integer, String> {@Overridepublic String map(Integer value) throws Exception {Map<String, String> globalJobParameters = getRuntimeContext().getGlobalJobParameters();return globalJobParameters.get("input");}
}