flink入门代码

flink入门代码

package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class LocalWithWebUI {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String string : s.split(" ")) {collector.collect(Tuple2.of(string, 1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> s) throws Exception {return s.f0;}}).sum(1);summed.print();env.execute("pxj");}
}
package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamingWordCount {public static void main(String[] args) throws  Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();int parallelism = env.getParallelism();System.out.println("parallelism:" + parallelism);DataStreamSource<String> source = env.socketTextStream("pxj62", 8881);System.out.println("source"+source.getParallelism());SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] strings = s.split(" ");for (String string : strings) {collector.collect(Tuple2.of(string, 1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> s) throws Exception {return s.f0;}}).sum(1);summed.print();env.execute("pxj");}
}
package com.lyj.sx.flink.wordCount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamingWordCountV3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);SingleOutputStreamOperator<Tuple2<String, Integer>> data = source.flatMap(new MyFlatMap());SingleOutputStreamOperator<Tuple2<String, Integer>> summed = data.keyBy(0).sum(1);summed.print();env.execute("pxj");}public static  class MyFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String string : s.split(" ")) {collector.collect(Tuple2.of(string,1));}}}
}
package com.lyj.sx.flink.day02;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReadTextFileDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/a.txt");source.map(new MapFunction<String, Tuple2<String,Integer>>() {Tuple2<String,Integer> s1;@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] strings = s.split(" ");for (String string : strings) {s1=Tuple2.of(string,1);}return s1;}}).print();env.execute("pxj");}
}
package com.lyj.sx.flink.day02;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Arrays;
import java.util.List;
import java.util.UUID;public class CustomNoParSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());System.out.println("环境执行的并行度:"+env.getParallelism());DataStreamSource<String> source = env.addSource(new Mysource2());System.out.println("source的并行度为:"+source.getParallelism());source.print();
//         env.execute("pxj");env.execute();}private static class Mysource1 implements SourceFunction<String> {//启动,并产生数据,产生的数据用SourceContext输出@Overridepublic void run(SourceContext<String> cx) throws Exception {List<String> lists = Arrays.asList("a", "b", "c", "pxj", "sx", "lyj");for (String list : lists) {cx.collect(list);}}//将Source停掉@Overridepublic void cancel() {}}private static class Mysource2 implements  SourceFunction<String>{private Boolean flag=true;@Overridepublic void run(SourceContext<String> cx) throws Exception {System.out.println("run....");while (flag){cx.collect(UUID.randomUUID().toString());}}@Overridepublic void cancel() {System.out.println("cancel");flag=false;}}
}

作者:pxj_sx(潘陈)
日期:2024-04-11 0:26:20

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/806622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每天学习一个Linux命令之rpm2cpio

每天学习一个Linux命令之rpm2cpio 在Linux系统中&#xff0c;有许多强大的命令可以帮助我们更好地管理和操作文件。今天我们将学习一个非常有用的命令——rpm2cpio&#xff0c;它可以将RPM包转换为cpio格式&#xff0c;方便我们提取其中的文件。 1. rpm2cpio命令简介 rpm2cp…

双云及多云融合(混合云)

背景&#xff1a;客户对于业务的高可用需求&#xff0c;当发生故障时&#xff0c;业务还能正常使用&#xff0c;如某云机房整体宕机&#xff0c;或云管理服务整体宕掉&#xff0c;导致客户业务不可用&#xff0c;此时&#xff0c;需有业务能顺利切换到灾备云上。 需求&#xf…

[蓝桥杯 2018 国 C] 迷宫与陷阱

题目&#xff1a; 思路&#xff1a; 代码&#xff1a; #include <bits/stdc.h> using namespace std; const int N1e310; char g[N][N];//输入&#xff1a;图的数组 int vis[N][N]; /* 剪枝&#xff1a;记录magic的个数&#xff08;一个点经过两次&#xff0c;magic越大…

【讲解下如何从零基础学习Java】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

[管理者与领导者-157] :如何保持正向思考和积极性思维模式?

目录 一、正向思考 VS 负向思考 二、积极性思维模式 VS 消极思维模式 三、乐观思维 VS 悲观思维 一、正向思考 VS 负向思考 正向思考和负向思考是两种截然不同的思维方式&#xff0c;它们在处理问题、面对挑战和塑造人生态度方面有着显著的不同。 正向思考是一种乐观、积极…

设计模式学习笔记 - 设计模式与范式 -行为型:9.迭代器模式(上):相比直接遍历集合数据,使用迭代器模式有哪些优势?

概述 上篇文章&#xff0c;我们学习了状态模式。状态模式是状态机的一种实现方式。它通过将事件触发的状态转移和动作执行&#xff0c;拆分到不同的状态类中&#xff0c;以此来避免状态机类中的分支判断逻辑&#xff0c;应对状态机类代码的复杂性。 本章&#xff0c;学习另外…

Dude, where’s that IP? Circumventing measurement-based IP geolocation(2010年)

下载地址:https://www.usenix.org/legacy/event/sec10/tech/full_papers/Gill.pdf 被引次数:102 Gill P, Ganjali Y, Wong B. Dude, Wheres That {IP}? Circumventing Measurement-based {IP} Geolocation[C]//19th USENIX Security Symposium (USENIX Security 10). 2010.…

day55 最长递增子序列 最长连续递增子序列 最长重复子数组

题目1 300 最长递增子序列 题目链接 300 最长递增子序列 题意 找到整数数组nums的最长严格递增子序列的长度&#xff08;子序列并不改变原始的顺序&#xff0c;但是可以删除元素&#xff09; 动态规划 动规五部曲 1&#xff09;dp数组及下标i的含义 dp[i] 表示以nums[i…

dnspy逆向和de4dot脱壳

拿到一个软件&#xff0c;使用dnspy查看&#xff0c;发现反汇编后关键部分的函数名和代码有很多乱码&#xff1a; 这样的函数非常多&#xff0c;要想进一步调试和逆向&#xff0c;就只能在dnspy中看反汇编代码了&#xff0c;而无法看到c#代码&#xff0c;当时的整个逆向过程只剩…

遥感图像处理:从畸变消除到专题信息提取

​ ​ ​在遥感技术的应用中&#xff0c;图像处理是不可或缺的关键步骤。从消除各种辐射畸变和几何畸变&#xff0c;到利用增强技术突出景物的光谱和空间特征&#xff0c;再到进一步理解、分析和判别处理后的图像&#xff0c;这一过程为我们呈现了一幅幅更为真实、清晰的…

免费ssl证书能一直续签吗?如何获取SSL免费证书?

免费SSL证书是否可以一直续签。我们需要了解SSL证书的基本工作原理。当你访问一个使用HTTPS协议的网站时&#xff0c;该网站实际上在使用一个SSL证书。这个证书相当于一个数字身份证明&#xff0c;它验证了网站的真实性和安全性。而这个证明是由受信任的第三方机构——通常是证…

被控平台的远程控制软件:功能、应用与安全性考量

随着信息技术的迅猛发展&#xff0c;远程控制软件在多个领域中的应用日益广泛。这类软件允许用户通过网络远程访问和控制另一台计算机或设备&#xff0c;为工作、学习和生活带来了极大的便利。然而&#xff0c;这种技术同样伴随着安全风险&#xff0c;特别是在被控平台方面。 K…

Harmony鸿蒙南向驱动开发-MIPI DSI

功能简介 DSI&#xff08;Display Serial Interface&#xff09;是由移动行业处理器接口联盟&#xff08;Mobile Industry Processor Interface (MIPI) Alliance&#xff09;制定的规范&#xff0c;旨在降低移动设备中显示控制器的成本。它以串行的方式发送像素数据或指令给外…

C语言程序设计每日一练(1)

探索数字组合的奇妙世界&#xff1a;如何生成所有独特的三位数 当我们想要探索由1、2、3、4这四个数字能组成多少个不同的三位数时&#xff0c;我们实际上是在解决一个排列组合的问题。这不仅是一个数学问题&#xff0c;也是编程领域经常遇到的挑战&#xff0c;特别是在数据处…

AI论文速读 | TF-LLM:基于大语言模型可解释性的交通预测

论文标题&#xff1a; Explainable Traffic Flow Prediction with Large Language Models 作者&#xff1a;Xusen Guo, Qiming Zhang, Mingxing Peng, Meixin Zhu(朱美新)*, Hao (Frank)Yang(杨昊) 机构&#xff1a;香港科技大学&#xff08;广州&#xff09;&#xff0c;约翰…

Hadoop、HDFS、Hive、Hbase区别及联系

Hadoop、HDFS、Hive和HBase是大数据生态系统中的关键组件,它们都是由Apache软件基金会管理的开源项目。下面将深入解析它们之间的区别和联系。 Hadoop Hadoop是一个开源的分布式计算框架,它允许用户在普通硬件上构建可靠、可伸缩的分布式系统。Hadoop通常指的是整个生态系统…

Fast-Planner(五)详解TopologyPRM

本文上接Fast-Planner第一篇文章的内容&#xff0c;本文主要详解这一系列的第二篇Robust Real-time UAV Replanning Using Guided Gradient-based Optimization and Topological Paths中的TopologyPRM即其代码。如有问题&#xff0c;欢迎各位大佬评论指出&#xff0c;带着我一起…

未设置超时时间导致线程池资源耗尽,排查过程

错误分析&#xff1a; Scheduled进行定时任务的时候&#xff0c;spring会创建一个线程&#xff0c;然后用这个线程来执行任务&#xff0c;如果这个任务阻塞了&#xff0c;那么这个任务就会停滞&#xff0c;出现不执行的情况。而使用原生的方法进行http请求时&#xff0c;如果不…

应该如何进行POC测试?—【DBA从入门到实践】第三期

在数据库选型过程中&#xff0c;为确保能够灵活应对数据规模的不断扩大和处理需求的日益复杂化&#xff0c;企业和技术人员会借助POC测试来评估不同数据库系统的性能。在测试过程中&#xff0c;性能、并发处理能力、存储成本以及高可用性等核心要素通常会成为大家关注的焦点&am…

分析染色体级别的基因组装配揭示了六倍体栽培菊花的起源和进化-文献精读-7

Analyses of a chromosome-scale genome assembly reveal the origin and evolution of cultivated chrysanthemum 分析染色体级别的基因组装配揭示了栽培菊花的起源和进化 六倍体植物基因组的文献&#xff0c;各位同仁还有什么有特色的基因组评论区留言~ 摘要 菊花&#xf…