使用Flink批处理实现WordCount

Flink作为一个非常优秀的大数据实时计算框架,在很多从事大数据开发的公司都是必备的技能,接下来我将通过Flink以批处理来实现入门案例WordCount

1:步骤一

idea新建设maven项目,并且自己配置好maven环境 

2:步骤二

在pom文件中加入下面的依赖和配置

 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.13.0</flink.version><!--JDK 版本--><java.version>1.8</java.version><slf4j.version>1.7.30</slf4j.version><!--Scala 2.11 版本--><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.6.0</version><executions><execution><goals><goal>java</goal></goals></execution></executions><configuration><classpathScope>test</classpathScope></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

3:步骤三

配置 log4j,在resources下面建一个文件log4j.properties,里面内容如下

# Output pattern : date [thread] priority category - message
log4j.rootLogger=WARN,CONSOLE,RollingFile#CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#RollingFile
log4j.appender.RollingFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingFile.File=logs/signserver.log
log4j.appender.RollingFile.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingFile.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n#Project default level
log4j.logger.com.ntko.sign=debug

4:步骤四

新建一个input目录,并且新建words.txt文件,文件中输入如下内容

hello world
hello flink
hello java

5:步骤五

 新建一个BatchWordCount类,里面代码如下

public class BatchWordCount {public static void main(String[] args) throws Exception {//1:创建执行环境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//2:从文件读取数据DataSource<String> dataSource = executionEnvironment.readTextFile("input/words.txt");//将每行数据进行分词,转换成二元组类型FlatMapOperator<String, Tuple2<String, Long>> returns = dataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {//将一行文本进行分词String[] words = line.split(" ");//将每个单词转换成二元组输出for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));//按照word进行分组UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = returns.groupBy(0);//分组内进行聚合统计AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);sum.print();}
}

6:步骤六

输出结果 可以看到flink单词出现了1次,world出现了1次,hello出现了三次,java出现了一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/268033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Application.DoEvents

记得第一次使用Application.DoEvents()是为了在加载大量数据时能够有一个数据加载的提示&#xff0c;不至于系统出现假死的现象&#xff0c;当时也没有深入的去研究他的原理是怎样的&#xff0c;结果在很多地方都用上了Application.DoEvents()&#xff0c;今天看到了关于这方面…

Servlet交互【重定向 与 请求分派】详解

Servlet交互 在serlvet中&#xff0c;需要调用另外一个资源来对浏览器的请求进行响应&#xff0c;两种方式实现&#xff1a; 调用HttpServletResponse.sendRedirect 方法实现 重定向 调用RequestDispatcher.forward 方法来实现请求分派 &#xff08;转发&#xff09; 1.reponse…

解决Error: No such file or directory @ rb_sysopen

mac使用brew安装flink时出现报错&#xff0c;是下载openjdk11报错的 原因是openjdk11依赖包下载不成功&#xff0c;使用brew单独下载该依赖包即可 brew install openjdk11

Mac Brew install 报错Command failed with exit 128:git

问题&#xff1a; 记录一个问题&#xff0c;Mac使用Brew安装Flink报错 具体如图所示&#xff0c;执行brew install apache-flink Error: Command failed with exit 128: git 解决方式&#xff1a; 输入brew -v后会提示你执行两个配置命令&#xff0c;直接复制执行就ok了&am…

Mac上安装flink笔记

1&#xff1a;步骤一 首先要有破jdk1.8&#xff0c;查看命令&#xff1a;java -version 2:步骤二 使用brew安装flink&#xff0c;命令如下&#xff1a; brew install apache-flink 3:步骤三 我这边安装的时候报错了&#xff0c;解决方式如下 报错1 解决方式 https://blog.…

工作笔记一——杂项

近期做的项目中遇到一些棘手的问题&#xff0c;解决的过程用到很多知识&#xff0c;在此记下主要的问题与解决方法。 页面功能介绍&#xff1a;获取五张表格的大量数据&#xff08;大概有几千条记录&#xff09;&#xff0c;然后到前台显示在table里面&#xff0c;实现行列汇总…

Coolite 中GridView行按钮取行ID并调用服务器端代码

效果图&#xff1a; 关系代码&#xff1a; <Command Handler"if(commandbutSelectReocrd){strrecord.data.SessionId; #{AjaxMethods}.SelectRecord(str);}" /> 全部html代码&#xff1a; 代码 <ext:GridPanel ID"GridPanel1"Height"325&quo…

Flink的三种执行模式STREAMING和BATCH和AUTOMATIC

执行模式 执行模式三种 BATCH模式的两种配置方法 什么时候选择BATCH模式

RabbitMQ基础概念详细介绍

转至&#xff1a;http://www.ostest.cn/archives/497 引言 你是否遇到过两个&#xff08;多个&#xff09;系统间需要通过定时任务来同步某些数据&#xff1f;你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎&#xff1f;如果是&#xff0c;那么恭喜你&#x…

信号与系统(中)

第四章 线性时不变系统的时域分析 4.1连续时间系统的时域分析 微分方程的求解 齐次解特解完全解起始状态到初始状态的转换 冲激平衡法连续时间系统的零输入响应与零状态响应 双零法4.2离散时间系统的时域分析 迭代法时域经典法双零法差分方程的求解 齐次解特解完全解离散时间系…

RocketMq中的perm属性2 4 6用于设置对当前创建Topic的操作权

RocketMq中的perm属性用于设置对当前创建Topic的操作权限 2表示&#xff1a;只可以写 4表示&#xff1a;只可以读 6表示&#xff1a;可以进行读写

css新奇技术及其未来发展

1.图像替换技术&#xff1a; 图像替换技术是指使用图像替换页面中文本的功能&#xff0c;类似与在页面中插入图像&#xff0c;只是这种方法更为方便&#xff0c;易于代码管理。通常来说&#xff0c;设计者习惯使用有意义的图像去替换一些标题&#xff0c;logo和某些特定的页面背…

八皇后问题求解动态图形演示

这是我以前用Delphi写的一个教学软件。内容是关于“八皇后”问题的求解动态图形演示。这个软件采用多线程设计&#xff0c;包含了递归回溯与非递归回溯两种算法&#xff0c;还可随时调整演示速度&#xff0c;界面共有五种前景和五种背景图形。包含所有源程序和资源文件。 以…

HashMap中最多只允许一条记录的键为Null,允许多条记录的值为Null

Map主要用于存储健值对&#xff0c;根据键得到值&#xff0c;因此不允许键重复(重复会覆盖)&#xff0c;但允许值重复。 Hashmap是一个最常用的Map&#xff0c;它根据键的HashCode值存储数据&#xff0c;根据键可以直接获取它的值&#xff0c;具有很快的访问速度。遍历时&#…

ORA-12154: TNS:could not resolve the connect identifier specified. Solved.

进入系统属性页面 rundll32.exe shell32.dll,Control_RunDLL sysdm.cpl,,3 并添加系统环境变量 ORACLE_HOME ${path} TNS_ADMIND ${path}\network\admin NLS_LANG AMERICAN_AMERICA.ZHS16GBK 其中${path} 从http://www.oracle.com/technetwork/topics/winx64soft-089540.…

LinkedHashMap的使用

LinkedHashMap LinkedHashMap是HashMap的一个子类&#xff1b; LinkedHashMap保存了记录的插入顺序&#xff0c;在用Iterator遍历LinkedHashMap时&#xff0c;先得到的记录肯定是先插入的&#xff1b; 在遍历的时候会比HashMap慢&#xff0c;不过有种情况例外&#xff0c;当H…

摩托罗拉:未来一切以手机为中心

人类有计算机以来&#xff0c;计算工具曾经是强大的计算机&#xff0c;个人电脑的出现&#xff0c;PC不但是重要计算的工具&#xff0c;同时它是互联网终端&#xff0c;改写了人类工作和生活的格局。未来的计算会是往什么方向发展&#xff0c;移动互联网成为人们关注的焦点&…