02. Flink 快速上手

02. Flink 快速上手

1、创建项目导入依赖

pom文件:

<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

2、需求

批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。

(1)数据准备

在工程目录下新建文件夹input,新建文本words.txt。

文件输入:

hello world
hello flink
hello java

2.1 批处理

代码编写(使用DataSet API实现)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、转换FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value     读取到的输入* @param out       返回的内容,Tuple2是一个二元分组,(字符串,个数)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}});// 4、按照 word 分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词// 5、各分组聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数// 6、输出sum.print();}
}

运行结果:

image-20240519130034466

2.2 流处理

2.2.1 有界流

代码编写(使用DataStream API实现,读取文件属于有界流)

package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、从文件中读取数据DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、处理数据(切换、转换、分组、聚合)// 3.1 切换、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 构建二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通过采集器向下游发送数据out.collect(tuple);}}});// 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);  // 1 表示二元组的第二个元素// 4、输出数据sum.print();// 5、执行env.execute();}
}

执行结果:

2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)

前面的编号是并行度,线程数。

2.2.2 无界流

(1)使用 netcat 监听7777端口,建立stream流

安装 netcat

brew install netcat

监听 7777 端口

nc -lk 7777

(2)代码编写(使用DataStream API实现,读取stream流属于无界流)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、数据处理(切割、转换、分组、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 将单组转为二元组Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 将二元组发送给下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、输出sum.print();// 5、执行env.execute();}
}

(3)测试

在终端发送消息

hello flink
hello world

观察程序控制台打印

8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/15587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s配置pods滚动发布

背景 采用微服务架构部署的应用&#xff0c;部署方式都要用到容器化部署k8s容器编排&#xff0c;最近我在公司负载的系统也是用的上述架构部署&#xff0c;但是随着系统的运行&#xff0c;用户提的需求就会越多&#xff0c;每次更新的话都要停机发布&#xff0c;最用户侧来说就…

【C语言刷题系列】求一个数组中两个元素a和b的和最接近整数m

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;C语言刷题系列 目录 一、问题描述 二、解题思路 解题思路&#xff1a; 解题步骤: 三、C语言代码实现及测试 一、问题描述 给定一…

指北者智能音乐学习机隆重亮相广州国际乐器展

2024年5月23-26日广州国际乐器展览会在广交会展馆B区隆重开幕&#xff0c;本届展会开设5大展厅、50000平方米的主题展区&#xff0c;吸引了700多家国内外参展商参展&#xff0c;打造集展示、商贸、文化交流、文娱于一体的广阔平台。深圳市指北科技有限公司也携旗下品牌指北者智…

AWS云服务器每月费用高昂,如何优化达到节省目的?

AWS云服务器每月费用可能因不同的使用情况和配置而有所不同。为了优化并节省AWS云服务器的费用&#xff0c;aws的合作伙伴九河云提供了一些建议&#xff1a; &#xff08;1&#xff09;调整实例大小&#xff1a;确保你使用的实例大小与你的工作负载相匹配。实例的容量每增加一倍…

Gopeed的高级用法

Gopeed是一个开源全平台下载器&#xff0c;具体简介请参考&#xff1a; “狗屁下载器”&#xff1f;Gopeed - 开源全平台下载器 (免费轻量 / 比 Aria2 好用 / 远程下载) - 异次元软件世界 (iplaysoft.com) 这里主要介绍下自己摸索出来的 Gopeed 的高级做法。 有的网站添加的…

时政|医疗结果互认

背景&#xff08;存在的问题&#xff09; 看同一种病&#xff0c;换一家医院甚至换一个院区、换一个科室&#xff0c;检查检验还得再来一遍&#xff0c;费钱又费时。开展检查检验结果互认&#xff0c;可以明显减轻患者就医负担。患者不用做重复检查&#xff0c;也可节约就医时…

基于JSP/Servlet校园二手交易平台(二)

目录 2 开发技术及开发环境 2.1 Java语言简介 2.2 J2EE技术介绍 2.3 Servlet/JSP技术 2.4 MVC 简介 2.5 Struts 技术 2.6 Hibernate 技术 2.6.1 应用程序的分层体系结构 2.6.2 Hibernate的应用及API简介 2.7 开发环境及环境配置 2.7.1 Java/JSP系统环境 2.7.2 JSP环…

D365 SysDictTable\SysDictField

文章目录 前言一、示例 前言 SysDictField 和 SysDictTable 用于访问表和字段的元数据信息。 一、示例 循环表&#xff0c;使对应数据源的字段禁止编辑 public void fieldNoAllowEdit(Common _common,formDataSource fds,boolean aE false){TableId tab…

小程序-购物车-基于SKU电商规格组件实现

SKU 概念&#xff1a; 存货单位&#xff08; Stock Keeping Unit &#xff09;&#xff0c; 库存 管理的最小可用单元&#xff0c;通常称为“单品”。 SKU 常见于电商领域&#xff0c;对于前端工程师而言&#xff0c;更多关注 SKU 算法 &#xff0c;基于后端的 SKU 数据…

(二)vForm 动态表单设计器之下拉、选择

系列文章目录 &#xff08;一&#xff09;vForm 动态表单设计器之使用 目录 系列文章目录 前言 一、后端需提供接口 二、组件配置 总结 前言 动态表单下拉、选择等组件&#xff0c;大概率要使用数据库中的数据&#xff0c;那么vForm如何拿到数据库中的数据呢&#xff1f;跟随…

僵尸进程、孤儿进程、守护进程

【一】僵尸进程和孤儿进程 【1】引入 我们知道在unix/linux中&#xff0c;正常情况下&#xff0c;子进程是通过父进程创建的&#xff0c;子进程在创建新的进程。 子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程 到底什么时候结束。 当一个 进程完成它…

动物合并消除休闲游戏源码 Animal Merge 益智游戏

一款动物合并消除休闲游戏源码&#xff0c;Animal Merge是一款引人入胜的益智游戏&#xff0c;玩家的任务是合并方块&#xff0c;创造出可爱的动物&#xff0c;这些动物的体型会逐渐变大。游戏玩法包括将方块放到网格上&#xff0c;并战略性地将它们合并以形成更大的动物形状。…

作文笔记9 描写方法

动态描写&#xff1a; 威尼斯小艇&#xff0c;窗外的风景飞快的后退。 静态描写&#xff1a; 牧场之国&#xff0c;牛不再哞哞&#xff0c;马忘记了踢马房的挡板。 动静结合&#xff1a; 火车进站&#xff0c;人声鼎沸&#xff0c;叫卖声&#xff0c;广播声&#xff0c;人…

【408精华知识】主存相关解题套路大揭秘!

讲完了Cache&#xff0c;再来讲讲主存是怎么考察的&#xff0c;我始终认为&#xff0c;一图胜千言&#xff0c;所以对于很多部件&#xff0c;我都是通过画图进行形象的记忆&#xff0c;那么接下来我们对主存也画个图&#xff0c;然后再来详细解读其考察套路~ 文章目录 零、主存…

机器人正逆运动学、动力学概念

1.基本概念 建立机器人的正逆运动学和正逆动力学模型是为了解决不同类型的控制和规划问题。这些模型帮助工程师和研究人员理解和预测机器人的行为&#xff0c;从而设计出更有效的控制策略和运动规划。以下是建立这些模型的主要原因和一些应用实例&#xff1a; 正运动学模型 正…

python-pytorch 下批量seq2seq+Bahdanau Attention实现问答1.0.000

python-pytorch 下批量seq2seq+Bahdanau Attention实现简单问答1.0.000 前言原理看图数据准备分词、index2word、word2index、vocab_size输入模型的数据构造注意力模型decoder的编写关于损失函数和优化器在预测时完整代码参考前言 前面实现了 luong的dot 、general、concat注意…

【话题】我眼神的IT行业现状与未来趋势

目录 一、挑战 教学资源的重新分配 教师角色的转变 学生学习方式的改变 教育评价体系的挑战 二、机遇 个性化学习 跨学科学习 国际合作与交流 创新教育模式 三、如何培养下一代IT专业人才 更新教育理念 加强基础设施建设 整合课程资源 加强实践教学 培养跨学科…

easy-es EsAutoConfiguration RestHighLevelClient 没有自动注入配置

我用的easy-es.version 是 2.0.0-beta1&#xff0c;是基于springboot2开发的&#xff0c;自动注入配置的目录扫描的是META-INF/spring.factories文件&#xff1b;而我使用的框架是springboot3&#xff0c;springboot3扫描的是META-INF/spring/org.springframework.boot.autocon…

【算法刷题day57】Leetcode:739. 每日温度、496.下一个更大元素 I

文章目录 Leetcode 739. 每日温度解题思路代码总结 Leetcode 496.下一个更大元素 I解题思路代码总结 草稿图网站 java的Deque Leetcode 739. 每日温度 题目&#xff1a;739. 每日温度 解析&#xff1a;代码随想录解析 解题思路 维护一个单调栈&#xff0c;当新元素大于栈顶&a…

【Linux】TCP协议【中】{确认应答机制/超时重传机制/连接管理机制}

文章目录 1.确认应答机制2.超时重传机制&#xff1a;超时不一定是真超时了3.连接管理机制 1.确认应答机制 TCP协议中的确认应答机制是确保数据可靠传输的关键部分。以下是该机制的主要步骤和特点的详细解释&#xff1a; 数据分段与发送&#xff1a; 发送方将要发送的数据分成一…