flink入门代码
package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class LocalWithWebUI {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String string : s.split(" ")) {collector.collect(Tuple2.of(string, 1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> s) throws Exception {return s.f0;}}).sum(1);summed.print();env.execute("pxj");}
}
package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamingWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();int parallelism = env.getParallelism();System.out.println("parallelism:" + parallelism);DataStreamSource<String> source = env.socketTextStream("pxj62", 8881);System.out.println("source"+source.getParallelism());SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] strings = s.split(" ");for (String string : strings) {collector.collect(Tuple2.of(string, 1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> s) throws Exception {return s.f0;}}).sum(1);summed.print();env.execute("pxj");}
}
package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamingWordCountV3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);SingleOutputStreamOperator<Tuple2<String, Integer>> data = source.flatMap(new MyFlatMap());SingleOutputStreamOperator<Tuple2<String, Integer>> summed = data.keyBy(0).sum(1);summed.print();env.execute("pxj");}public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String string : s.split(" ")) {collector.collect(Tuple2.of(string,1));}}}
}
package com.lyj.sx.flink.day02;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReadTextFileDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/a.txt");source.map(new MapFunction<String, Tuple2<String,Integer>>() {Tuple2<String,Integer> s1;@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] strings = s.split(" ");for (String string : strings) {s1=Tuple2.of(string,1);}return s1;}}).print();env.execute("pxj");}
}
package com.lyj.sx.flink.day02;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Arrays;
import java.util.List;
import java.util.UUID;public class CustomNoParSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());System.out.println("环境执行的并行度:"+env.getParallelism());DataStreamSource<String> source = env.addSource(new Mysource2());System.out.println("source的并行度为:"+source.getParallelism());source.print();
// env.execute("pxj");env.execute();}private static class Mysource1 implements SourceFunction<String> {//启动,并产生数据,产生的数据用SourceContext输出@Overridepublic void run(SourceContext<String> cx) throws Exception {List<String> lists = Arrays.asList("a", "b", "c", "pxj", "sx", "lyj");for (String list : lists) {cx.collect(list);}}//将Source停掉@Overridepublic void cancel() {}}private static class Mysource2 implements SourceFunction<String>{private Boolean flag=true;@Overridepublic void run(SourceContext<String> cx) throws Exception {System.out.println("run....");while (flag){cx.collect(UUID.randomUUID().toString());}}@Overridepublic void cancel() {System.out.println("cancel");flag=false;}}
}
作者:pxj_sx(潘陈)
日期:2024-04-11 0:26:20