Flink-Java版单词计数(批处理流处理)

创建工程

pom.xml文件依赖如下:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依赖的一些组件需要 Scala 环境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency></dependencies>

定义输入源

  1. resources 目录下创建 hello.txt 作为输入源, 内容单词自定义.
  2. 在 Linux 环境下使用 nc -lk 端口模拟网络流式输入.

编写代码

批处理方式 WordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.io.InputStream;/*** {@link DataSet} 批处理 api, 处理离线数据* @author regotto*/
public class WordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 将单词按照空格分割, 变为 (word, 1) 形式的二元组DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 进行分组, 按照 index = 1 进行求和}).groupBy(0).sum(1);resultSet.print();}
}

流式 WordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** {@link DataStream} 流式api, 处理实时数据* @author regotto*/
public class StreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中获取输入流
//        DataStream<String> source = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 从 socket 文本流读取数据, 模拟 flink 从 kafka获取数据DataStreamSource<String> source = env.socketTextStream("localhost", 7777);DataStream<Tuple2<String, Integer>> resultStream = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 进行分组, 按照 index = 1 进行求和}).keyBy(0).sum(1);resultStream.print();env.execute();}
}

使用 nc -lk监听7777端口, 模拟网络流式输入.

使用nc监听7777端口
执行结果如下:
在这里插入图片描述

结论

批处理: 将所有文本处理完, 才统计输出.
流式: 在开发环境中, 每读取一行文本就计数一次, 进行统计输出.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468531.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么做批注_BIM平台是什么?有何用?怎么用?

原标题&#xff1a;BIM平台是什么&#xff1f;有何用&#xff1f;怎么用&#xff1f;随着BIM技术的深入应用&#xff0c;我们也不再拘泥于单单BIM软件的使用&#xff0c;在BIM技术的广泛应用之下&#xff0c;BIM平台也逐渐成为了BIM技术的最佳体现&#xff0c;也成为了众多工程…

重入的问题

抛出个问题 有一个定时器,定时时间是1秒,然后里面有一个执行函数,里面的函数有时候执行500毫秒,有时候执行2秒,如果是这样的话,有没有问题,如果有问题,要怎么解决? 先说上面中断的问题 我之前写过一篇文章,专门说中断的,我又想起来之前说的那个重入和不重入的问题…

web前端性能优化

一、什么是前端性能优化从用户访问资源到资源完整的展现在用户面前的过程中&#xff0c;通过技术手段和优化策略&#xff0c;缩短每个步骤的处理时间从而提升整个资源的访问和呈现速度。二、为什么要做前端性能优提升网站性能&#xff0c;提升用户体验三、前端性能优化的原则1、…

Flink并行度优先级_集群操作常用指令_运行组件_任务提交流程_数据流图变化过程

Flink并行度优先级(从高到低) sum(1).setParallelism(1) env.setParallelism(1) ApacheFlinkDashboard任务添加并行度配置 flink-conf.yaml并行度配置 注: 处理输入输出时, 并行度默认为 1Flink集群常用指令 提交任务 run: 代表执行; c: 指定入口类; p: 并行度; host, post:…

招银网络笔试java_最新!!招银网络科技Java面经,整理附答案

作者&#xff1a;榨汁机2号 链接&#xff1a;https://www.nowcoder.com/discuss/1640193月12号现场面试的&#xff0c; 感觉好像所有人有3面的样子。到目前也没有消息&#xff0c;有消息的吱一声&#xff0c;让我早点死了这个心…..一面 1 Java的八大基本类型byte、short、int、…

CSS常用的元素居中方法

参考&#xff1a;CSS: 常用的元素居中方法 CSS居中布局总结 1.水平居中 &#xff08;1&#xff09;文本水平居中 text-align: center; &#xff08;2&#xff09;块级元素水平居中 ①设置margin margin: auto; ②display:inline-blocktext-align:center .parent {width: 400px;…

FlinkAPI_Environment_输入源_算子转化流程

Flink Environment getExecutionEnvironment() 根据当前平台, 获取对应的执行环境, 若未设置并行度, 使用 flink-conf.yaml 中的并行度配置, 默认 1. StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();createLocalEnviroment() 创建本地…

第2章 Linux内核模块

宏内核和微内核继续前面第一章的知识&#xff0c;虽然有点啰嗦&#xff0c;既然啰嗦了就继续啰嗦下去吧&#xff0c;也是给第一章的内容增加解释。我们知道内核如果按种类来划分的话&#xff0c;可以分为宏内核和微内核&#xff0c;微内核是一个比较先进的内核&#xff0c;我不…

BZOJ 1137 半平面交

半平面交的板子 //By SiriusRen #include <bits/stdc.h> #define double long double using namespace std; const int N100050; const double eps1e-10; int n,m,xx,yy,tot; double Ans; vector<int>vec[N]; struct Point{double x,y;}point[N]; struct Line{Poin…

的注册表怎么才能删干净_油烟净化器怎么清洗才能清理干净呢?

油烟机的净化器的主要功能是过滤厨房里的油烟&#xff0c;因为它总是处理油烟&#xff0c;所以清洗净化器很麻烦&#xff0c;那么如何清洗呢&#xff1f;经常清洗油烟净化器是很有必要的&#xff0c;但清洗起来很麻烦&#xff0c;清洗起来也不容易。今天&#xff0c;我想告诉大…

Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中

Sink 将计算好结果输出到外部系统, 调用 addSink()传入指定的SinkFunction() 将结果输出到 Kafka 中将结果输出到 Redis 中将结果输出到 ES 中将结果输出到 Mysql 中: 事先创建好表结构 pom.xml 事先导入对应的 connector: <dependencies><dependency><group…

ado 字符串变量

这次变量主要针对 Mfc 的 Cstring 类型的变量&#xff08;前面VC 链接Access 数据库 插入变量到表&#xff09; 思路; 1 把cstring 类型 转为 string 2 string 转 char 数组 3 sprintf 写入数组 string 转 char 数组函数[cpp]view plaincopyprint?char* zhuanhuan(std::strin…

周立功先生和他的AWorks团队招聘

我之前写的一篇文章&#xff0c;介绍了周立功先生&#xff0c;我记得那篇文章的阅读量非常多&#xff0c;也让我迎来一段小高潮&#xff0c;随着时间的推移&#xff0c;慢慢的增加了我对周立功先生的了解&#xff0c;我们很多人&#xff0c;像我吧&#xff0c;工作的时候&#…

mongodb python 大于_Python中使用MongoDB详解

作者&#xff1a;Zarten知乎专栏&#xff1a;Python爬虫深入详解知乎ID&#xff1a; Zarten简介&#xff1a; 互联网一线工作者&#xff0c;尊重原创并欢迎评论留言指出不足之处&#xff0c;也希望多些关注和点赞是给作者最好的鼓励 &#xff01;介绍MongoDB是一种面向文档型的…

这不是商业互吹,是学习的宝藏

学习如逆水行舟&#xff0c;不进则退&#xff1b;只有坚持不断的学习,才能保持进步。今天给大家精心挑选的这几个优质的公众号&#xff0c;在行业深耕已久&#xff0c;相信大家一定会有所收获&#xff0c;感兴趣的可以关注一下。互联网架构师 号主985计算机硕士毕业&#xff…

【Ubuntu】ubuntu系统下python3和python2环境自由切换

shell里执行&#xff1a;sudo update-alternatives --install /usr/bin/python python /usr/local/lib/python2.7 100sudo update-alternatives --install /usr/bin/python python /usr/local/lib/python3.2 150此时你会发现如果要切换到Python2&#xff0c;执行&#xff1a;su…

打印机更换感光鼓单元k_干货,激光打印机常见故障维修方法总结

激光打印机是日常生活和办公中运用较多的打印机类型下面&#xff0c;我们来总结一下激光打印机常见故障维修方法。硒鼓组件常见故障&#xff0c;维修方法。激光打印机硒鼓的常见故障包括硒鼓漏粉&#xff0c;打印出黑横线&#xff0c;打印文件颜色不正常打印的图像及文字变形&a…

关于这些那些

关于篮球先说下&#xff0c;我刚才已经写完文章了&#xff0c;但是因为没有保存&#xff0c;浏览器想着周末早点回去休息就闪退了&#xff0c;把写好的文章给闪退没有了&#xff0c;这个真是拿起自己的坑砸死了自己&#xff0c;那种赶脚只有自己能够明白&#xff0c;真的是太难…

mysqldump 定时备份数据(全量)

MYSQL 数据库备份有很多种(cp、tar、lvm2、mysqldump、xtarbackup)等等&#xff0c;具体使用哪一个还要看你的数据规模。下面给出一个表 #摘自《学会用各种姿态备份Mysql数据库》 备份方法备份速度恢复速度便捷性功能一般用于cp快快一般、灵活性低很弱少量数据备份mysqldump慢慢…

第3章 Linux内核调试手段之内核打印

开始前面说的话在我写代码的生涯里&#xff0c;我看到过很多大神炫耀自己的调试手段&#xff0c;也看到很多大神写过非常厉害的代码&#xff0c;我认为&#xff0c;相比于写代码&#xff0c;调试更加重要&#xff0c;而那些能在写代码的时候就加入了自己的调试信息的&#xff0…