使用Flink批处理实现WordCount

Flink作为一个非常优秀的大数据实时计算框架,在很多从事大数据开发的公司都是必备的技能,接下来我将通过Flink以批处理来实现入门案例WordCount

1:步骤一

idea新建设maven项目,并且自己配置好maven环境 

2:步骤二

在pom文件中加入下面的依赖和配置

 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.13.0</flink.version><!--JDK 版本--><java.version>1.8</java.version><slf4j.version>1.7.30</slf4j.version><!--Scala 2.11 版本--><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.6.0</version><executions><execution><goals><goal>java</goal></goals></execution></executions><configuration><classpathScope>test</classpathScope></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

3:步骤三

配置 log4j,在resources下面建一个文件log4j.properties,里面内容如下

# Output pattern : date [thread] priority category - message
log4j.rootLogger=WARN,CONSOLE,RollingFile#CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#RollingFile
log4j.appender.RollingFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingFile.File=logs/signserver.log
log4j.appender.RollingFile.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingFile.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#Project default level
log4j.logger.com.ntko.sign=debug

4:步骤四

新建一个input目录,并且新建words.txt文件,文件中输入如下内容

hello world
hello flink
hello java

5:步骤五

 新建一个BatchWordCount类,里面代码如下

public class BatchWordCount {public static void main(String[] args) throws Exception {//1:创建执行环境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//2:从文件读取数据DataSource<String> dataSource = executionEnvironment.readTextFile("input/words.txt");//将每行数据进行分词,转换成二元组类型FlatMapOperator<String, Tuple2<String, Long>> returns = dataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {//将一行文本进行分词String[] words = line.split(" ");//将每个单词转换成二元组输出for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));//按照word进行分组UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = returns.groupBy(0);//分组内进行聚合统计AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);sum.print();}
}

6:步骤六

输出结果 可以看到flink单词出现了1次,world出现了1次,hello出现了三次,java出现了一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/268033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Application.DoEvents

记得第一次使用Application.DoEvents()是为了在加载大量数据时能够有一个数据加载的提示&#xff0c;不至于系统出现假死的现象&#xff0c;当时也没有深入的去研究他的原理是怎样的&#xff0c;结果在很多地方都用上了Application.DoEvents()&#xff0c;今天看到了关于这方面…

Servlet交互【重定向 与 请求分派】详解

Servlet交互 在serlvet中&#xff0c;需要调用另外一个资源来对浏览器的请求进行响应&#xff0c;两种方式实现&#xff1a; 调用HttpServletResponse.sendRedirect 方法实现 重定向 调用RequestDispatcher.forward 方法来实现请求分派 &#xff08;转发&#xff09; 1.reponse…

解决Error: No such file or directory @ rb_sysopen

mac使用brew安装flink时出现报错&#xff0c;是下载openjdk11报错的 原因是openjdk11依赖包下载不成功&#xff0c;使用brew单独下载该依赖包即可 brew install openjdk11

《Sibelius 脚本程序设计》连载(十四) - 2.1 注释、语句、语句块

《Sibelius 脚本程序设计》连载(Flash 格式) 转载于:https://www.cnblogs.com/Sibelius/archive/2010/12/11/1903389.html

Mac Brew install 报错Command failed with exit 128:git

问题&#xff1a; 记录一个问题&#xff0c;Mac使用Brew安装Flink报错 具体如图所示&#xff0c;执行brew install apache-flink Error: Command failed with exit 128: git 解决方式&#xff1a; 输入brew -v后会提示你执行两个配置命令&#xff0c;直接复制执行就ok了&am…

[转载]一个游戏程序员的学习资料

想起写这篇文章是在看侯杰先生的《深入浅出MFC》时,突然觉得自己在大学这几年关于游戏编程方面还算是有些心得&#xff0c;因此写出这篇小文,介绍我眼中的游戏程序员的书单与源代码参考。一则是作为自己今后两年学习目标的备忘录,二来没准对别人也有点参考价值。我的原则是只写…

Mac上安装flink笔记

1&#xff1a;步骤一 首先要有破jdk1.8&#xff0c;查看命令&#xff1a;java -version 2:步骤二 使用brew安装flink&#xff0c;命令如下&#xff1a; brew install apache-flink 3:步骤三 我这边安装的时候报错了&#xff0c;解决方式如下 报错1 解决方式 https://blog.…

工作笔记一——杂项

近期做的项目中遇到一些棘手的问题&#xff0c;解决的过程用到很多知识&#xff0c;在此记下主要的问题与解决方法。 页面功能介绍&#xff1a;获取五张表格的大量数据&#xff08;大概有几千条记录&#xff09;&#xff0c;然后到前台显示在table里面&#xff0c;实现行列汇总…

Coolite 中GridView行按钮取行ID并调用服务器端代码

效果图&#xff1a; 关系代码&#xff1a; <Command Handler"if(commandbutSelectReocrd){strrecord.data.SessionId; #{AjaxMethods}.SelectRecord(str);}" /> 全部html代码&#xff1a; 代码 <ext:GridPanel ID"GridPanel1"Height"325&quo…

Flink的三种执行模式STREAMING和BATCH和AUTOMATIC

执行模式 执行模式三种 BATCH模式的两种配置方法 什么时候选择BATCH模式

activemq生产者和消费者的双向通信

http://websystique.com/spring/spring-4-jms-activemq-example-with-jmslistener-enablejms/转载于:https://www.cnblogs.com/zhangshitong/p/7906468.html

大学生必犯的N大错误(1)

1&#xff09;不会英语&#xff1a; 计算机科学源于美国&#xff0c;重量级的文档都是英文的。不会英语的你只能忍受拙劣的翻译和很大延迟的文档和图书&#xff08;翻译出来的优秀的文档和图书几乎都是很久以前的出版物&#xff09;。 语言的重要性&#xff0c;实际上体现的是沟…

RabbitMQ基础概念详细介绍

转至&#xff1a;http://www.ostest.cn/archives/497 引言 你是否遇到过两个&#xff08;多个&#xff09;系统间需要通过定时任务来同步某些数据&#xff1f;你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎&#xff1f;如果是&#xff0c;那么恭喜你&#x…

ubuntu 目录及文件权限 000 444 666 777(转)

转载自&#xff1a;http://hi.baidu.com/im886/blog/item/434764d9f6c210f838012f0b.html 1 [001] 执行权限 x2 [010] 只写权限 w4 [100] 只读权限 r sudo chmod 600 &#xff08;只有所有者有读和写的权限&#xff09; sudo chmod 644 &#xff08;所有者有读和写的权限&am…

信号与系统(中)

第四章 线性时不变系统的时域分析 4.1连续时间系统的时域分析 微分方程的求解 齐次解特解完全解起始状态到初始状态的转换 冲激平衡法连续时间系统的零输入响应与零状态响应 双零法4.2离散时间系统的时域分析 迭代法时域经典法双零法差分方程的求解 齐次解特解完全解离散时间系…

对自学还是培训的看法

在论坛上看到的帖子&#xff0c;转过来&#xff0c;标题是 “对自学还是培训的看法” http://bbs.51cto.com/thread-605267-1.html 转载于:https://blog.51cto.com/gooltsing/467392

RocketMq中的perm属性2 4 6用于设置对当前创建Topic的操作权

RocketMq中的perm属性用于设置对当前创建Topic的操作权限 2表示&#xff1a;只可以写 4表示&#xff1a;只可以读 6表示&#xff1a;可以进行读写

时刻修正自已的思想

时刻修正自已的思想 人与人的区别&#xff0c;归根结底是人的思想的区别&#xff0c;以前也经常看一些名言警句&#xff0c;但那都是别人的&#xff0c;只能参考&#xff0c;自已很难转化为自已的&#xff0c;所以要从实践中学习&#xff0c;以下是我自已的所思所想&#xff0c…