02. Flink 快速上手

02. Flink 快速上手

1、创建项目导入依赖

pom文件:

<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

2、需求

批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。

(1)数据准备

在工程目录下新建文件夹input,新建文本words.txt。

文件输入:

hello world
hello flink
hello java

2.1 批处理

代码编写(使用DataSet API实现)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、转换FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value     读取到的输入* @param out       返回的内容,Tuple2是一个二元分组,(字符串,个数)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}});// 4、按照 word 分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词// 5、各分组聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数// 6、输出sum.print();}
}

运行结果:

image-20240519130034466

2.2 流处理

2.2.1 有界流

代码编写(使用DataStream API实现,读取文件属于有界流)

package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、处理数据(切换、转换、分组、聚合)// 3.1 切换、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 构建二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通过采集器向下游发送数据out.collect(tuple);}}});// 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);  // 1 表示二元组的第二个元素// 4、输出数据sum.print();// 5、执行env.execute();}
}

执行结果:

2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)

前面的编号是并行度,线程数。

2.2.2 无界流

(1)使用 netcat 监听7777端口,建立stream流

安装 netcat

brew install netcat

监听 7777 端口

nc -lk 7777

(2)代码编写(使用DataStream API实现,读取stream流属于无界流)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、数据处理(切割、转换、分组、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、输出sum.print();// 5、执行env.execute();}
}

(3)测试

在终端发送消息

hello flink
hello world

观察程序控制台打印

8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/15587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言刷题系列】求一个数组中两个元素a和b的和最接近整数m

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;C语言刷题系列 目录 一、问题描述 二、解题思路 解题思路&#xff1a; 解题步骤: 三、C语言代码实现及测试 一、问题描述 给定一…

指北者智能音乐学习机隆重亮相广州国际乐器展

2024年5月23-26日广州国际乐器展览会在广交会展馆B区隆重开幕&#xff0c;本届展会开设5大展厅、50000平方米的主题展区&#xff0c;吸引了700多家国内外参展商参展&#xff0c;打造集展示、商贸、文化交流、文娱于一体的广阔平台。深圳市指北科技有限公司也携旗下品牌指北者智…

基于JSP/Servlet校园二手交易平台(二)

目录 2 开发技术及开发环境 2.1 Java语言简介 2.2 J2EE技术介绍 2.3 Servlet/JSP技术 2.4 MVC 简介 2.5 Struts 技术 2.6 Hibernate 技术 2.6.1 应用程序的分层体系结构 2.6.2 Hibernate的应用及API简介 2.7 开发环境及环境配置 2.7.1 Java/JSP系统环境 2.7.2 JSP环…

小程序-购物车-基于SKU电商规格组件实现

SKU 概念&#xff1a; 存货单位&#xff08; Stock Keeping Unit &#xff09;&#xff0c; 库存 管理的最小可用单元&#xff0c;通常称为“单品”。 SKU 常见于电商领域&#xff0c;对于前端工程师而言&#xff0c;更多关注 SKU 算法 &#xff0c;基于后端的 SKU 数据…

(二)vForm 动态表单设计器之下拉、选择

系列文章目录 &#xff08;一&#xff09;vForm 动态表单设计器之使用 目录 系列文章目录 前言 一、后端需提供接口 二、组件配置 总结 前言 动态表单下拉、选择等组件&#xff0c;大概率要使用数据库中的数据&#xff0c;那么vForm如何拿到数据库中的数据呢&#xff1f;跟随…

动物合并消除休闲游戏源码 Animal Merge 益智游戏

一款动物合并消除休闲游戏源码&#xff0c;Animal Merge是一款引人入胜的益智游戏&#xff0c;玩家的任务是合并方块&#xff0c;创造出可爱的动物&#xff0c;这些动物的体型会逐渐变大。游戏玩法包括将方块放到网格上&#xff0c;并战略性地将它们合并以形成更大的动物形状。…

【408精华知识】主存相关解题套路大揭秘!

讲完了Cache&#xff0c;再来讲讲主存是怎么考察的&#xff0c;我始终认为&#xff0c;一图胜千言&#xff0c;所以对于很多部件&#xff0c;我都是通过画图进行形象的记忆&#xff0c;那么接下来我们对主存也画个图&#xff0c;然后再来详细解读其考察套路~ 文章目录 零、主存…

python-pytorch 下批量seq2seq+Bahdanau Attention实现问答1.0.000

python-pytorch 下批量seq2seq+Bahdanau Attention实现简单问答1.0.000 前言原理看图数据准备分词、index2word、word2index、vocab_size输入模型的数据构造注意力模型decoder的编写关于损失函数和优化器在预测时完整代码参考前言 前面实现了 luong的dot 、general、concat注意…

【话题】我眼神的IT行业现状与未来趋势

目录 一、挑战 教学资源的重新分配 教师角色的转变 学生学习方式的改变 教育评价体系的挑战 二、机遇 个性化学习 跨学科学习 国际合作与交流 创新教育模式 三、如何培养下一代IT专业人才 更新教育理念 加强基础设施建设 整合课程资源 加强实践教学 培养跨学科…

【Linux】TCP协议【中】{确认应答机制/超时重传机制/连接管理机制}

文章目录 1.确认应答机制2.超时重传机制&#xff1a;超时不一定是真超时了3.连接管理机制 1.确认应答机制 TCP协议中的确认应答机制是确保数据可靠传输的关键部分。以下是该机制的主要步骤和特点的详细解释&#xff1a; 数据分段与发送&#xff1a; 发送方将要发送的数据分成一…

vue深度选择器(:deep​)

处于 scoped 样式中的选择器如果想要做更“深度”的选择&#xff0c;也即&#xff1a;影响到子组件&#xff0c;可以使用 :deep() 这个伪类&#xff1a; <style lang"scss" scoped> .evaluation-situation-details :deep .cl-icon-arrow-right {display: none…

【Python 对接QQ的接口(二)】简单用接口查询【等级/昵称/头像/Q龄/当天在线时长/下一个等级升级需多少天】

文章日期&#xff1a;2024.05.25 使用工具&#xff1a;Python 类型&#xff1a;QQ接口 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 AES解密处理&#xff08;直接解密即可&#xff09;&#xff08;crypto-js.js 标准算法&#xff09;&…

JS根据所选ID数组在源数据中取出对象

let selectIds [1, 3] // 选中id数组let allData [{ id: 1, name: 123 },{ id: 2, name: 234 },{ id: 3, name: 345 },{ id: 4, name: 456 },] // 源数据let newList [] // 最终数据selectIds.map((i) > {allData.filter((item) > {item.id i && newList.pus…

aws sqs基础概念和队列参数解析

分布式队列的组成部分 生产者&#xff0c;向队列发送消息的组件消费者&#xff0c;接受队列消息队列&#xff0c;多个sqs服务器存储冗余存储消息 sqs自动删除超过最大留存时间的消息&#xff08;默认4天&#xff09;&#xff0c;可以通过SetQueueAttributes调整为&#xff08…

【408真题】2009-13

“接”是针对题目进行必要的分析&#xff0c;比较简略&#xff1b; “化”是对题目中所涉及到的知识点进行详细解释&#xff1b; “发”是对此题型的解题套路总结&#xff0c;并结合历年真题或者典型例题进行运用。 涉及到的知识全部来源于王道各科教材&#xff08;2025版&…

VBA即用型代码手册:删除Excel中空白行Delete Blank Rows in Excel

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。可以大大提高自己的劳动效率&#xff0c;而且可以提高数据的准确性。我这里专注VBA,将我多年的经验汇集在VBA系列九套教程中。 作为我的学员要利用我的积木编程思想&#xff0c;积木编程最重要的是积木如何搭建…

IDEA中好用的插件

IDEA中好用的插件 CodeGeeXMybatis Smart Code Help ProAlibaba Java Coding Guidelines​(XenoAmess TPM)​通义灵码常用操作 TranslationStatistic CodeGeeX 官网地址&#xff1a;https://codegeex.cn/ 使用手册&#xff1a;https://zhipu-ai.feishu.cn/wiki/CuvxwUDDqiErQU…

Android 自定义图片进度条

用系统的Progressbar&#xff0c;设置图片drawable作为进度条会出现图片长度不好控制&#xff0c;容易被截断&#xff0c;或者变形的问题。而我有个需求&#xff0c;使用图片背景&#xff0c;和图片进度&#xff0c;而且在进度条头部有个闪光点效果。 如下图&#xff1a; 找了…

Kafka(十三)监控与告警

目录 Kafka监控与告警1 解决方案1.2 基础知识JMX监控指标代理查看KafkaJMX远程端口 1.3 真实案例Kafka Exporter:PromethusPromethus Alert ManagerGrafana 1.3 实际操作部署监控和告警系统1.2.1 部署Kafka Exporter1.2.2 部署Prometheus1.2.3 部署AlertManger1.2.4 添加告警规…

大疆上云API本地部署与飞机上云

文章目录 前言一、安装基础环境1. EMQX 安装(版本4.4.0)2. MySql 安装(版本8.0.26)3. Redis 安装 二、部署后端&#xff08;JDK必须11及以上&#xff09;三、部署前端四、成为大疆开发者五、飞机注册上云六、绑定飞机七、无人机状态查看八、直播流查看 前言 大疆上云API官方文…