使用flink编写WordCount

1. env-准备环境

2. source-加载数据

3. transformation-数据处理转换

4. sink-数据输出

5. execute-执行

流程图:

DataStream API开发

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/

 添加依赖

<properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>

 

编写代码

package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}

批处理结果:前面的序号代表分区

流处理结果:

也可以通过如下方式修改分区数量:

 env.setParallelism(2);

关于并行度的代码演示:

系统以及算子都可以设置并行度,或者获取并行度

package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism = env.getParallelism();System.out.println(parallelism);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度,一般跟系统保持一致System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}
  1. 打包、上传

 文件夹需要提前准备好

提交我们自己开发打包的任务

flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

去界面中查看运行结果:

因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/887170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

# issue 4 进程控制函数

目录 一、进程控制函数一 二、进程控制函数二 启动进程&#xff1a;&#xff08;exec系列&#xff09; 创建新进程&#xff1a; 测试代码&#xff1a; 测试结果&#xff1a; 三、进程控制函数三 结束进程&#xff1a; 测试代码&#xff1a; 测试结果&#xff1a; 四、…

Java项目实战II基于SpringBoot的共享单车管理系统开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在共享经济蓬勃发展的今天…

Linux 网络编程之UDP套接字

前言 前面我们对网络的发展&#xff0c;网络的协议、网路传输的流程做了介绍&#xff0c;最后&#xff0c;我们还介绍了 IP 和 端口号&#xff0c;ip port 叫做 套接字 socket&#xff0c; 本期我们就来介绍UDP套接字编程&#xff01; 目录 1、预备知识 1.1 传输层协议: T…

---Arrays类

一 java 1.Arrays类 1.1 toString&#xff08;&#xff09; 1.2 arrays.sort( )-----sort排序 1&#xff09;直接调用sort&#xff08;&#xff09; Arrays.sort() 方法的默认排序顺序是 从小到大&#xff08;升序&#xff09;。 2&#xff09;定制排序【具体使用时 调整正负…

Java 对象头、Mark Word、monitor与synchronized关联关系以及synchronized锁优化

1. 对象在内存中的布局分为三块区域&#xff1a; &#xff08;1&#xff09;对象头&#xff08;Mark Word、元数据指针和数组长度&#xff09; 对象头&#xff1a;在32位虚拟机中&#xff0c;1个机器码等于4字节&#xff0c;也就是32bit&#xff0c;在64位虚拟机中&#xff0…

6.7机器学习期末复习题

空间 样本空间 就是属性的所有可能情况&#xff0c;包括了一切可能出现或不可能出现的所有样本情况 版本空间&假设空间 假设空间就是在样本空间的基础上&#xff0c;给所有属性都加了一个通配符&#xff0c;表示任意即可&#xff1b;以及加上了一个空集&#xff0c;表示…

Qt界面设计时使各控件依据窗口缩放进行栅格布局的方法

图1 最终效果 想要达成上述图片的布局效果&#xff0c;具体操作如下&#xff1a; 新建一窗体&#xff1a; 所需控件如下&#xff1a; Table View控件一个&#xff1b; Group Box控件一个&#xff1b; Push Button控件2个&#xff1b; Horiziontal Spacer控件2个&#xf…

mac安装Pytest、Allure、brew

安装环境 安装pytest 命令 pip3 install pytest 安装allure 命令&#xff1a;brew install allure 好吧 那我们在安装allure之前 我们先安装brew 安装brew 去了官网复制了命令 还是无法下载 如果你们也和我一样可以用这个方法哦 使用国内的代码仓库来执行brew的安装脚本…

数据结构C语言描述5(图文结合)--队列,数组、链式、优先队列的实现

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…

一篇文章了解机器学习

一篇文章了解机器学习&#xff08;上&#xff09; 一、软件版本安装二、数据集的加载三、数据集的切分四、数据特征提取及标准化1、字典数据的特征提取2、文本特征向量的提取3、数据标准化处理 四、特征降维注&#xff1a;训练器的区别&#xff1a;&#xff1a;五、模型的训练与…

day03(单片机高级)RTOS

目录 RTOS(实时操作系统) 裸机开发模式 轮询方式 前后台&#xff08;中断方式&#xff09; 改进&#xff08;前后台&#xff08;中断&#xff09;&#xff09;定时器 裸机进一步优化 裸机的其他问题 RTOS的概念 什么是RTOS 为什么要使用 RTOS RTOS的应用场景 RTOS的…

Hello-Go

Hello-Go 环境变量 GOPATH 和 GOROOT &#xff1a;不同于其他语言&#xff0c;go中没有项目的说法&#xff0c;只有包&#xff0c;其中有两个重要的路径&#xff0c;GOROOT 和 GOPATH Go开发相关的环境变量如下&#xff1a; GOROOT&#xff1a;GOROOT就是Go的安装目录&…

pytorch官方FasterRCNN代码详解

本博文转自捋一捋pytorch官方FasterRCNN代码 - 知乎 (zhihu.com)&#xff0c;增加了其中代码的更详细的解读&#xff0c;以帮助自己理解该代码。 代码理解的参考Faster-RCNN全面解读(手把手带你分析代码实现)---前向传播部分_手把手faster rcnn-CSDN博客 1. 代码结构 作为 to…

全志T113双核异构处理器的使用基于Tina Linux5.0——RTOS系统定制开发

8、RTOS系统定制开发 此处以在rtos/components/aw目录下创建一个简单的软件包为例&#xff0c;帮助客户了解RTOS环境&#xff0c;为RTOS系统定制开发提供基础。 RTOS环境下的软件包主要由三部分组成&#xff0c;源文件&#xff0c;Makefile&#xff0c;Kconfig&#xff0c;如下…

springboot实战(13)(@PatchMapping、@RequestParam、@URL、ThreadLocal线程局部变量)

目录 一、PATCH请求方式。 二、实现用户更新头像功能。 三、注解RequestParam。 四、注解URL。&#xff08;对传来的参数是否是合法地址进行校验&#xff09; 一、PATCH请求方式。 patch中文翻译&#xff1a;局部、小块。PATCH 请求主要用于对已存在的资源进行局部修改&#xf…

nvm安装node遇到的若干问题(vscode找不到npm文件、环境变量配置混乱、npm安装包到D盘)

问题一&#xff1a;安装完nvm后需要做哪些环境变量的配置&#xff1f; 1.打开nvm文件夹下的setting文件&#xff0c;设置nvm路径和安装node路径&#xff0c;并添加镜像。 root: D:\software\nvm-node\nvm path: D:\software\nvm-node\nodejs node_mirror: https://npmmirror.c…

面向FWA市场!移远通信高性能5G-A模组RG650V-NA通过北美两大重要运营商认证

近日&#xff0c;全球领先的物联网整体解决方案供应商移远通信宣布&#xff0c;其旗下符合3GPP R17标准的新一代5G-A模组RG650V-NA成功通过了北美两家重要运营商认证。凭借高速度、大容量、低延迟、高可靠等优势&#xff0c;该模组可满足CPE、家庭/企业网关、移动热点、高清视频…

2024年11月21日Github流行趋势

项目名称&#xff1a;twenty 项目维护者&#xff1a;charlesBochet, lucasbordeau, Weiko, FelixMalfait, bosiraphael项目介绍&#xff1a;正在构建一个由社区支持的现代化Salesforce替代品。项目star数&#xff1a;21,798项目fork数&#xff1a;2,347 项目名称&#xff1a;p…

AWTK 最新动态:支持鸿蒙系统(HarmonyOS Next)

HarmonyOS是全球第三大移动操作系统&#xff0c;有巨大的市场潜力&#xff0c;在国产替代的背景下&#xff0c;机会多多&#xff0c;AWTK支持HarmonyOS&#xff0c;让AWTK开发者也能享受HarmonyOS生态的红利。 AWTK全称为Toolkit AnyWhere&#xff0c;是ZLG倾心打造的一套基于C…

docker 配置同宿主机共同网段的IP 同时通过通网段的另一个电脑实现远程连接docker

docker配置网络 #宿主机执行命令 ifconfig 查询对应的主机ip 子网掩码 网关地址 #[网卡名称]&#xff1a;inet[主机IP] netmask[子网掩码] broadcast[网关地址]这里需要重点关注&#xff1a;eno1[网卡名称]以及【192.168.31.225】网关地址 在宿主机执行docker命令创建一个虚拟…