【入门Flink】- 02Flink经典案例-WordCount

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)String filePath = Objects.requireNonNull(BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataSource<String> lineDS = env.readTextFile(filePath);// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件String filePath = Objects.requireNonNull(StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataStreamSource<String> lineStream = env.readTextFile(filePath);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132690.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于前馈神经网络完成鸢尾花分类

目录 1 小批量梯度下降法 1.0 展开聊一聊~ 1.1 数据分组 1.2 用DataLoader进行封装 1.3 模型构建 1.4 完善Runner类 1.5 模型训练 1.6 模型评价 1.7 模型预测 思考 总结 参考文献 首先基础知识铺垫~ 继续使用第三章中的鸢尾花分类任务&#xff0c;将Softm…

uinapp微信小程序隐私政策授权

&#x1f680; 隐私弹窗效果图&#xff1a; 1、启用隐私相关功能在manifest.json文件中配置 usePrivacyCheck: true "mp-weixin" : {"__usePrivacyCheck__" : true, },2、创建组件 <template><view><!-- 隐私政策弹窗 --><uni-popu…

在RabbitMQ中 WorkQueue 工作队列 和发布(publish)/订阅(Subscribe) 有什么区别?

在RabbitMQ中&#xff0c;"Work Queue"&#xff08;工作队列&#xff09;和"Publish/Subscribe"&#xff08;发布/订阅&#xff09;是两种不同的消息传递模型&#xff0c;它们有不同的用途和工作方式。 Work Queue (工作队列)&#xff1a; 用途&#xff1a…

SpringBoot基础(五)-- 引导类

引言: SpringBoot确实帮助我们减少了很多配置工作,下面说一下程序是如何运行的。目前程序运行的入口就是SpringBoot工程创建时自带的那个类了,带有main方法的那个类,运行这个类就可以启动SpringBoot工程的运行。 @SpringBootApplication public class SpringBootQu…

阿里云服务器怎么购买更省钱?优惠入口分享

阿里云服务器怎么购买更省钱&#xff1f;不要直接在云服务器页面购买&#xff0c;不划算&#xff0c;在阿里云特价活动上购买更优惠&#xff0c;阿腾云atengyun.com分享阿里云服务器省钱购买方法&#xff0c;节省90%&#xff0c;可以先在阿里云CLUB中心领券 aliyun.club 专用满…

Java-Hbase介绍

1.1. 概念 base 是分布式、面向列的开源数据库&#xff08;其实准确的说是面向列族&#xff09;。HDFS 为 Hbase 提供可靠的 底层数据存储服务&#xff0c;MapReduce 为 Hbase 提供高性能的计算能力&#xff0c;Zookeeper 为 Hbase 提供 稳定服务和 Failover 机制&#xff0c…

【优选算法系列】【专题五位运算】第一节.常见的位运算(面试题 01.01. 判定字符是否唯一和268. 丢失的数字)

文章目录 前言常见的位运算一、判定字符是否唯一 1.1 题目描述 1.2 题目解析 1.2.1 算法原理 1.2.2 代码编写二、丢失的数字 2.1 题目描述 2.2 题目解析 2.2.1 算法原理 2.2.2 代码编写总结 前言 常见的…

k8s之service五种负载均衡byte的区别

1&#xff0c;什么是Service&#xff1f; 1.1 Service的概念​ 在k8s中&#xff0c;service 是一个固定接入层&#xff0c;客户端可以通过访问 service 的 ip 和端口访问到 service 关联的后端pod&#xff0c;这个 service 工作依赖于在 kubernetes 集群之上部署的一个附件&a…

Spring的总结

SpringFramework 认识SpringFramework 最首先&#xff0c;我们先要认识Spring和SpringFramework两者之间有什么联系&#xff1f; 广义上的 Spring 泛指以 Spring Framework 为基础的 Spring 技术栈。 狭义的 Spring 特指 Spring Framework&#xff0c;通常我们将它称为 Spr…

找不到d3dcompiler_47.dll,无法继续执行代码,解决方法

首先&#xff0c;让我们来了解一下d3dcompiler_47.dll文件。d3dcompiler_47.dll是一个动态链接库文件&#xff0c;它是DirectX SDK中的一个重要组件&#xff0c;用于编译DirectX着色器。当我们在使用一些需要DirectX支持的软件或游戏时&#xff0c;如果缺少了这个文件&#xff…

Mysql数据库的备份和恢复及日志管理

一、数据备份概述 1.1 备份的分类 完全备份&#xff1a;整个数据库完整地进行备份 增量备份&#xff1a;在完全备份的基础之上&#xff0c;对后续新增的内容进行备份 冷备份&#xff1a;关机备份&#xff0c;停止mysql服务&#xff0c;然后进行备份 热备份&#xff1a;开机备…

java 版本企业招标投标管理系统源码+多个行业+tbms+及时准确+全程电子化

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及审…

go 语言介绍

背景 一直有在零散的时间用go写点代码&#xff0c;正好借着最近比较有时间写东西的契机&#xff0c;给这个看着年轻&#xff0c;实际也已经发展10几年&#xff0c;并在当下众多开发领域都有不可忽视作用的语言做个介绍吧 golang 的起点 golang 的诞生可以说是时代造就了它&a…

Python实现Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换

Python实现Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换 前言前提条件相关介绍实验环境Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换convert_labelme_json_to_txtjsons/000000000009.json代码实现输出结果labels/000000000009.txt convert_txt_to_labelme_…

华为政企协作平板产品集

产品类型产品型号产品说明 IdeaHub系列IFP-UG86EIdeaHub系列 | 4K柔光屏 | 智慧妙笔 | 企业级信息安全IdeaHub系列IHB2- 65SUIdeaHub系列 | 4K 柔光屏 | 25ms流畅板书 | 安全智慧教学IdeaHub系列IHB2- 75SUIdeaHub系列 | 4K 柔光屏 | 25ms流畅板书 | 安全智慧教学IdeaHu…

Docker Compose安装milvus向量数据库单机版-milvus基本操作

目录 安装Ubuntu 22.04 LTS在power shell启动milvus容器安装docker desktop下载yaml文件启动milvus容器Milvus管理软件Attu python连接milvus配置下载wget示例导入必要的模块和类与Milvus数据库建立连接创建名为"hello_milvus"的Milvus数据表插入数据创建索引基于向量…

NOIP2023模拟12联测33 构造

题目大意 给定一个非负整数 n n n&#xff0c;请构造出一个不超过 40 40 40\times 40 4040的矩阵&#xff0c;每个位置填 r r r、 y y y、 x x x三者之一&#xff0c;使得连续的三个格子按顺序构成字符串 r y x ryx ryx恰好 n n n个。 这里连续的是指同一行、同一列或者同一…

NLP之Bert多分类实现案例(数据获取与处理)

文章目录 1. 代码解读1.1 代码展示1.2 流程介绍1.3 debug的方式逐行介绍 3. 知识点 1. 代码解读 1.1 代码展示 import json import numpy as np from tqdm import tqdmbert_model "bert-base-chinese"from transformers import AutoTokenizertokenizer AutoToken…

猜数字--

猜数字 文章目录 前言一、题目描述 二、解题 程序运行代码 三、举一反三题目描述 解题 程序运行代码 猜数字游戏 前言 本系列为循环结构编程题&#xff0c;点滴成长&#xff0c;一起逆袭。 一、题目描述 当输入数字为123时&#xff0c;输出yes;否则输出error 二、解题 程序…

3.8 Android eBPF HelloWorld调试(二)

写在前面 我们开发eBPF程序的初衷就是再不改动内核的情况下,将内核监控数据传递给到用户态;像应用进程开发一样开发内核监控程序。 Android开机的时候eBPF程序被加载器加载到内核中,但此时它并没有被附加到内核函数上去,也就是ebpf程序并不会执行,我们可以理解为,它仅仅被…