创建第一个 Flink 项目

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.10.0</version></dependency>
</dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><!--声明绑定到 maven 的compile阶段--><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {def main(args: Array[String]): Unit = {//创建一个批处理的执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//从文件中读取数据var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group byval resultDataSet: DataSet[(String,Int)] = inputDateSet.flatMap(_.split(" "))//分词得到所有 word构成的数据集.map((_,1))//_表示当前 word 转换成一个二元组(word,count).groupBy(0)//以二元组中第一个元素作为key.sum(1) //1表示聚合二元组的第二个元素的值//打印输出resultDataSet.print()}
}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flinkimport org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flinkimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流  hostname:prot 从程序运行参数中读取val params: ParameterTool = ParameterTool.fromArgs(args);val hostname: String = params.get("host");val port: Int = params.getInt("port");val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/205237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文合集】在非欧空间中的图嵌入方法(Graph Embedding in Non-Euclidean Space)

文章目录 1. Hyperbolic Models1.1 Hyperbolic Graph Attention Network1.2 Poincar Embeddings for Learning Hierarchical Representations.1.3 Learning Continuous Hierarchies in the Lorentz Model of Hyperbolic Geometry1.4 Hyperbolic Graph Convolutional Neural Net…

【开源】基于Vue.js的假日旅社管理系统

文末获取源码&#xff0c;项目编号&#xff1a; S 078 。 \color{red}{文末获取源码&#xff0c;项目编号&#xff1a;S078。} 文末获取源码&#xff0c;项目编号&#xff1a;S078。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统介绍2.2 QA 问答 三、系统展示四…

【实用经验】如何根据CVE编号找到安全补丁

找到对应补丁页面 例如查找编号为 CVE-2019-0708 的漏洞&#xff0c;访问下面链接即可&#xff0c;替换末尾编号可获取其他漏洞更新补丁。 https://msrc.microsoft.com/update-guide/vulnerability/CVE-2019-0708根据实际情况点击右侧补丁链接即可跳转下载 最后根据实际情况下…

水平自动扩容和缩容HPA;API资源对象NetworkPolicy;Kubernetes用户安全控制;Kubernetes创建普通用户示例

水平自动扩容和缩容HPA&#xff1b;API资源对象NetworkPolicy&#xff1b;Kubernetes用户安全控制&#xff1b;Kubernetes创建普通用户示例 水平自动扩容和缩容HPA&#xff08;本部分操作适合K8s版本>1.23.x) HPA全称是Horizontal Pod Autoscaler&#xff0c;翻译成中文是…

揭秘C语言结构体:通往内存对齐的视觉之旅

揭秘C语言结构体&#xff1a;通往内存对齐的视觉之旅 引言 在C语言的编程旅程中&#xff0c;结构体&#xff08;structs&#xff09;是一个关键而强大的概念。结构体不仅允许我们组织和存储不同类型的数据&#xff0c;而且通过深入了解内存对齐&#xff0c;我们可以更好地优化…

Python+requests+unittest+excel实现接口自动化测试框架

在刚刚进入测试行业的时候&#xff0c;最开始也是做功能测试&#xff0c;我想很多伙伴和我一样&#xff0c;觉得自动化测试都很高端&#xff0c;很神秘。迫不及待的想去学习作自动化测试。 以前比较常用数据库python做自动化&#xff0c;后面发现excel个人觉得更加适合&#x…

初始类与对象

初始类与对象 实验介绍 本课程是进一步对类与对象的深入认识&#xff0c;如何定义并实例化一个类&#xff0c;介绍如何使用 C 标准库 string 类等。 知识点 认识类与对象内联函数string 类类的定义与实例化 认识类与对象 官方定义 类&#xff1a;在面向对象编程中是一种…

自定义starter案例——统计独立IP访问次数

自定义starter案例——统计独立IP访问次数 文章目录 自定义starter案例——统计独立IP访问次数ip计数业务功能开发定时任务报表开发使用属性配置功能设置功能参数配置调整 自定义拦截器开启yml提示功能 ip计数业务功能开发 public class IpCountService {private Map<String…

matplot函数调整子图大小测试

调整subplot()函数的子图间距 import numpy as np import matplotlib.pyplot as plt for i in range(1,7):figsize 10,6plt.subplot(2,3,i)plt.text(0.5,0.5,str((2,3,i)),fontsize18,hacenter) **plt.subplots_adjust(hspace3.3, wspace0.3)** plt.show()import numpy as np…

水果党flstudio用什么midi键盘?哪个版本的FL Studio更适合我

好消息&#xff01;好消息&#xff01;特大好消息&#xff01; 水果党们&#xff01;终于有属于自己的专用MIDI键盘啦&#xff01; 万众期待的Novation FLKEY系列 正式出炉&#xff01; 话有点多话&#xff0c;先分享一份干货&#xff0c;尽快下载 FL Studio 21 Win-安装包&…

2023人工智能和市场营销的融合报告:创造性合作的新时代需要新的原则

今天分享的人工智能系列深度研究报告&#xff1a;《2023人工智能和市场营销的融合报告&#xff1a;创造性合作的新时代需要新的原则》。 &#xff08;报告出品方&#xff1a;M&CSAATCHITHINKS&#xff09; 报告共计&#xff1a;11页 生成型人工智能的兴起和重要性 生成式…

P8 Linux 目录操作

目录 前言 01 mkdir 系统调用 mkdir的代码示例 02 rmdir删除目录 03 打开、读取以及关闭目录 3.1 opendir()函数原型&#xff1a; 04 读取目录 readdir() 05 struct dirent 结构体&#xff1a; 06 rewinddir ()函数重置目录流 07 关闭目录 closedir ()函数 测试:打印…

基于深度学习的遥感图像变化差异可视化系统

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义 遥感图像变化差异可视化是遥感图像处理和分析的重要研究领域之一。随着遥感技术的快速发展和遥感数据的广泛应用&#xff0c;遥感图像的获取和处理变得越来越容易…

python-比较Excel两列数据,并分别显示差异

利用 openpyxl 模块&#xff0c;操作Excel&#xff0c;比较Excel两列数据&#xff0c;并分别显示差异 表格数据样例如下图 A&#xff0c;B两列是需要进行比较的数据&#xff08;数据源为某网站公开数据&#xff09;&#xff1b;C&#xff0c;D两列是比较结果的输出列 A&#…

小白学习java理解栈手写栈——第四关(青铜挑战)

内容1.理解栈的基本特征2.理解如何使用数组来构造栈3.理解如何使用链表来构造栈 1.栈的基础知识 1.1栈的特征 栈和队列是比较特殊的线性表&#xff0c;又称为访问受限的线性表。栈是很多表达式、符号等运算的基础&#xff0c;也是递归的底层实现&#xff0c;理论上递归能做的…

Linux 防病毒软件:CentOS有哪些付费的防病毒软件

CentOS是一个基于开源的Linux发行版,通常不像Windows那样普遍需要使用付费的防病毒软件。大多数Linux系统侧重于使用开源和免费的安全工具来保护系统。一些常见的免费和开源的防病毒软件和安全工具包括ClamAV、Sophos Antivirus for Linux、rkhunter、chkrootkit等。 如果你非…

[数据集][目标检测]拉横幅识别横幅检测数据集VOC+yolo格式1962张1类别

数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1962 标注数量(xml文件个数)&#xff1a;1962 标注数量(txt文件个数)&#xff1a;1962 标注类别数&a…

探索Scrapy-spider:构建高效网络爬虫

Spider简介 Scrapy中的Spider是用于定义和执行数据抓取逻辑的核心组件。Spider负责从指定的网站抓取数据&#xff0c;并定义了如何跟踪链接、解析内容以及提取数据的规则。它允许您定制化地指定要抓取的网站、页面和所需的信息。Spider的作用是按照预定的规则爬取网页&#xf…

边缘计算与人工智能的融合

随着物联网技术的迅猛发展&#xff0c;大量设备和传感器开始连接至互联网&#xff0c;产生了海量的数据。传统的云计算模式往往无法满足对数据实时性和隐私保护的需求&#xff0c;而边缘计算技术的兴起为解决这一难题提供了新的思路。边缘计算将数据处理和分析的功能下沉至数据…