【入门Flink】- 02Flink经典案例-WordCount

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)String filePath = Objects.requireNonNull(BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataSource<String> lineDS = env.readTextFile(filePath);// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件String filePath = Objects.requireNonNull(StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataStreamSource<String> lineStream = env.readTextFile(filePath);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132690.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于前馈神经网络完成鸢尾花分类

目录 1 小批量梯度下降法 1.0 展开聊一聊~ 1.1 数据分组 1.2 用DataLoader进行封装 1.3 模型构建 1.4 完善Runner类 1.5 模型训练 1.6 模型评价 1.7 模型预测 思考 总结 参考文献 首先基础知识铺垫~ 继续使用第三章中的鸢尾花分类任务&#xff0c;将Softm…

uinapp微信小程序隐私政策授权

&#x1f680; 隐私弹窗效果图&#xff1a; 1、启用隐私相关功能在manifest.json文件中配置 usePrivacyCheck: true "mp-weixin" : {"__usePrivacyCheck__" : true, },2、创建组件 <template><view><!-- 隐私政策弹窗 --><uni-popu…

阿里云服务器怎么购买更省钱?优惠入口分享

阿里云服务器怎么购买更省钱&#xff1f;不要直接在云服务器页面购买&#xff0c;不划算&#xff0c;在阿里云特价活动上购买更优惠&#xff0c;阿腾云atengyun.com分享阿里云服务器省钱购买方法&#xff0c;节省90%&#xff0c;可以先在阿里云CLUB中心领券 aliyun.club 专用满…

Java-Hbase介绍

1.1. 概念 base 是分布式、面向列的开源数据库&#xff08;其实准确的说是面向列族&#xff09;。HDFS 为 Hbase 提供可靠的 底层数据存储服务&#xff0c;MapReduce 为 Hbase 提供高性能的计算能力&#xff0c;Zookeeper 为 Hbase 提供 稳定服务和 Failover 机制&#xff0c…

【优选算法系列】【专题五位运算】第一节.常见的位运算(面试题 01.01. 判定字符是否唯一和268. 丢失的数字)

文章目录 前言常见的位运算一、判定字符是否唯一 1.1 题目描述 1.2 题目解析 1.2.1 算法原理 1.2.2 代码编写二、丢失的数字 2.1 题目描述 2.2 题目解析 2.2.1 算法原理 2.2.2 代码编写总结 前言 常见的…

k8s之service五种负载均衡byte的区别

1&#xff0c;什么是Service&#xff1f; 1.1 Service的概念​ 在k8s中&#xff0c;service 是一个固定接入层&#xff0c;客户端可以通过访问 service 的 ip 和端口访问到 service 关联的后端pod&#xff0c;这个 service 工作依赖于在 kubernetes 集群之上部署的一个附件&a…

Spring的总结

SpringFramework 认识SpringFramework 最首先&#xff0c;我们先要认识Spring和SpringFramework两者之间有什么联系&#xff1f; 广义上的 Spring 泛指以 Spring Framework 为基础的 Spring 技术栈。 狭义的 Spring 特指 Spring Framework&#xff0c;通常我们将它称为 Spr…

找不到d3dcompiler_47.dll,无法继续执行代码,解决方法

首先&#xff0c;让我们来了解一下d3dcompiler_47.dll文件。d3dcompiler_47.dll是一个动态链接库文件&#xff0c;它是DirectX SDK中的一个重要组件&#xff0c;用于编译DirectX着色器。当我们在使用一些需要DirectX支持的软件或游戏时&#xff0c;如果缺少了这个文件&#xff…

Mysql数据库的备份和恢复及日志管理

一、数据备份概述 1.1 备份的分类 完全备份&#xff1a;整个数据库完整地进行备份 增量备份&#xff1a;在完全备份的基础之上&#xff0c;对后续新增的内容进行备份 冷备份&#xff1a;关机备份&#xff0c;停止mysql服务&#xff0c;然后进行备份 热备份&#xff1a;开机备…

java 版本企业招标投标管理系统源码+多个行业+tbms+及时准确+全程电子化

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及审…

go 语言介绍

背景 一直有在零散的时间用go写点代码&#xff0c;正好借着最近比较有时间写东西的契机&#xff0c;给这个看着年轻&#xff0c;实际也已经发展10几年&#xff0c;并在当下众多开发领域都有不可忽视作用的语言做个介绍吧 golang 的起点 golang 的诞生可以说是时代造就了它&a…

Python实现Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换

Python实现Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换 前言前提条件相关介绍实验环境Labelme的Json标注文件与YOLO格式的TXT标注文件相互转换convert_labelme_json_to_txtjsons/000000000009.json代码实现输出结果labels/000000000009.txt convert_txt_to_labelme_…

Docker Compose安装milvus向量数据库单机版-milvus基本操作

目录 安装Ubuntu 22.04 LTS在power shell启动milvus容器安装docker desktop下载yaml文件启动milvus容器Milvus管理软件Attu python连接milvus配置下载wget示例导入必要的模块和类与Milvus数据库建立连接创建名为"hello_milvus"的Milvus数据表插入数据创建索引基于向量…

NLP之Bert多分类实现案例(数据获取与处理)

文章目录 1. 代码解读1.1 代码展示1.2 流程介绍1.3 debug的方式逐行介绍 3. 知识点 1. 代码解读 1.1 代码展示 import json import numpy as np from tqdm import tqdmbert_model "bert-base-chinese"from transformers import AutoTokenizertokenizer AutoToken…

数据结构--前缀树(Trie)

1. 简介 前缀树是一种数据结构&#xff0c;常用来字符搜索。 2. 实现 包含的操作主要是: 加入串搜索串 代码实现&#xff0c;直接用leetcode_208的题解咯。 代码 class Trie { public:Trie():isEnd(false){for ( int i 0; i < 26;i)child[i] nullptr;}~Trie() {fo…

Webpack 中 Plugin 的作用是什么?常用 plugin 有哪些?

说说webpack中常见的Plugin&#xff1f;解决了什么问题&#xff1f;- 题目详情 - 前端面试题宝典 1、plugin 的作用 Plugin 是一种计算机应用程序&#xff0c;它和主应用程序互相交互&#xff0c;以提供特定的功能。 是一种遵循一定规范的应用程序接口编写出来的程序&#…

CSDN每日一题学习训练——Java版(两数相加、二叉树的锯齿形层序遍历、解数独)

版本说明 当前版本号[20231106]。 版本修改说明20231106初版 目录 文章目录 版本说明目录两数相加题目解题思路代码思路补充说明参考代码 二叉树的锯齿形层序遍历题目解题思路代码思路参考代码 解数独题目解题思路代码思路补充说明参考代码 两数相加 题目 给你两个 非空 的…

动态IP和静态IP哪个安全,该怎么选择

随着互联网的普及&#xff0c;越来越多的人开始关注网络安全问题。其中&#xff0c;IP地址作为网络通信中的重要组成部分&#xff0c;也成为了人们关注的焦点。 在IP地址中&#xff0c;动态IP和静态IP是两种不同的分配方式&#xff0c;它们各自具有不同的特点&#xff0c;那么…

论文阅读—— CEASC(cvpr2023)

arxiv&#xff1a;https://arxiv.org/abs/2303.14488 github&#xff1a;https://github.com/Cuogeihong/CEASC 为了进一步减轻SC中的信息损失&#xff0c;使训练过程更加稳定&#xff0c;我们在训练过程中除了稀疏卷积之外&#xff0c;还保持了正常的密集卷积&#xff0c;生成…

基于SSM的社区智慧养老监护管理平台

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…