Flink(一)【WordCount 快速入门】

前言

        学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

        最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

        这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
        就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

        我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties 

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 入门案例

0、数据准备

在 根目录下创建 words.txt

hello flink
hello java
hello spark
hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个执行批式数据处理环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSetDataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 将每行数据转换成一个二元组类型// 输入类型: String 输出类型: Tuple2FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =// String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型// 4. 根据 word 分组UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置// 5. 分组内进行聚合AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置// 6. 打印结果res.print();}
}

运行结果:

(hadoop,1)
(flink,1)
(hello,4)
(java,1)
(spark,1)Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamDataStreamSource<String> lineDS = env.readTextFile("input/words.txt");// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果:

3> (java,1)
13> (flink,1)
1> (spark,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
5> (hello,4)
15> (hadoop,1)

        我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

        Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

        我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class UnBoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamParameterTool parameterTool = ParameterTool.fromArgs(args);String host = parameterTool.get("host");Integer port = parameterTool.getInt("port");DataStreamSource<String> lineDS = env.socketTextStream(host,port);// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果: 

        可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

        Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

        还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/134581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java Web的在线教学质量评价系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

机器学习——回归

目录 一、线性回归 1、回归的概念&#xff08;Regression、Prediction&#xff09; 2、符号约定 3、算法流程 4、最小二乘法&#xff08;LSM&#xff09; 二、梯度下降 梯度下降的三种形式 1、批量梯度下降&#xff08;Batch Gradient Descent,BGD&#xff09;&#xff…

MES系统防呆措施之具体场景学习

在工业设计上&#xff0c;为了避免使用者的操作失误造成机器或人身伤害&#xff08;包括无意识的动作或下意识的误动作或不小心的肢体动作&#xff09;&#xff0c;会针对这些可能发生的情况来做预防措施&#xff0c;称为防呆。对于注塑生产企业来讲&#xff0c;模具亦是企业的…

MinGW32丢失dll文件

问题现象 执行Makefile的时候&#xff0c;突然出现这个提示&#xff0c;还有好几个类似的&#xff0c;提示我找不到dll文件&#xff0c;建议重装。 问题分析 重装软件 最直接的办法肯定是按照建议来重装&#xff0c;但是发现重装了好几次&#xff0c;不是缺这个就是缺那个&a…

Linux多线程【线程池】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f383;操作环境&#xff1a; CentOS 7.6 腾讯云远程服务器 文章目录 &#x1f307;前言&#x1f3d9;️正文1.线程池的概念1.1.池化技术1.2.线程池的优点1.3.线程池的应用场景 2.线程池的…

Vue路由使用参数传递数据

一、使用query参数传递数据 &#xff08;一&#xff09;参数的传递 1. 携带参数进行传递 <router-link to"/路径?参数名1参数值1&参数名2参数值2">内容</router-link> 我们在下面的代码中传递每条消息的id和标题&#xff1a; 2. 配置对象进行传递…

运动想象 EEG 信号分析

基于运动想象的公开数据集&#xff1a;Data set IVa (BCI Competition III)1 数据描述参考前文&#xff1a;https://blog.csdn.net/qq_43811536/article/details/134224005?spm1001.2014.3001.5501 本文使用公开数据集 Data set IVa 中的部分被试数据&#xff0c;数据已公开可…

C#在.NET Windows窗体应用中使用LINQtoSQL

目录 一、新建Windows窗体应用并添加LINQtoSQL类 二、错误信息CS0234 三、添加扩展包让Windows窗体应用支持LINQtoSQL类 默认安装的背景下&#xff0c;新建的Windows窗体应用是不支持LINQtoSQL类的。现象是资源管理器里的依赖项中默认的安装不能自动生成支持system.data.lin…

如何在Python爬虫中使用IP代理以避免反爬虫机制

目录 前言 一、IP代理的使用 1. 什么是IP代理&#xff1f; 2. 如何获取IP代理&#xff1f; 3. 如何使用IP代理&#xff1f; 4. 如何避免IP代理失效&#xff1f; 5. 代理IP的匿名性 二、代码示例 总结 前言 在进行爬虫时&#xff0c;我们很容易会遇到反爬虫机制。网站…

JS操作字符串常见方法

目录 一&#xff1a;前言 二&#xff1a;常见的内置方法 1、charAt与charCodeAt 2、indexOf与lastIndexOf 3、substring与substr 4、toLowerCase 和 toUpperCase 5、slice 6、replace 7、split 8、concat 9、trim 10、trimStart / trimLeft 11、trimEnd / trimRigh…

flutter开发报错The instance member ‘widget‘ can‘t be accessed in an initializer

文章目录 问题描述问题原因解决方法 问题描述 The instance member ‘widget’ can’t be accessed in an initializer. 问题原因 “The instance member ‘widget’ can’t be accessed in an initializer” 错误是因为在初始化器列表中&#xff08;constructor initializer…

[ACTF2020 新生赛]Upload 1

题目环境&#xff1a; 仍旧是文件上传漏洞 这道题和上一道大差不差、大同小异、这里不再赘述。 [极客大挑战 2019]Upload 1&#xff1a;https://blog.csdn.net/m0_73734159/article/details/134267317?spm1001.2014.3001.5501 区别在于本题需要在抓包数据里面改文件后缀&#…

2023年11月5日网规考试备忘

早上题目回忆&#xff1a; pki体系 ipsec&#xff0c;交换安全&#xff08;流量抑制&#xff09; aohdlc bob metclaf —ethernet pon tcp三次握手 OSPF lsa&#xff1f;交换机组ospf配置问题&#xff0c;ping网关可通&#xff0c;AB不通 raid6 300G*8 网络利用率 停等协议10…

《009.Springboot+vue之进销存管理系统》

《009.Springbootvue之进销存管理系统》 项目简介 [1]本系统涉及到的技术主要如下&#xff1a; 推荐环境配置&#xff1a;DEA jdk1.8 Maven MySQL 前后端分离; 后台&#xff1a;SpringBootMybatisredis; 前台&#xff1a;vueElementUI; [2]功能模块展示&#xff1a; 1.用户管…

亚马逊合规,亚马逊涉及12个站点合规政策更新,需警惕合规要求!

最近&#xff0c;许多亚马逊站点的卖家陆续收到了合规政策更新的通知邮件&#xff0c;涵盖了美国站、加拿大站、英国站、法国站、意大利站、德国站以及西班牙站。 这些更新影响了不同品类的卖家&#xff0c;包括以下品类&#xff1a; 美国站&#xff08;US&#xff09;对于“发…

【Linux】:文件系统

文件系统 一.认识硬件-磁盘1.磁盘的物理构成2.磁盘的存储构成3.逻辑结构 二.文件系统 文件内容属性&#xff0c;前面我们所说的文件操作都是针对以打开的文件&#xff0c;那么未打开的文件呢&#xff1f;当然是在磁盘上储存着&#xff0c;接下来谈谈它是如何储存的。 一.认识硬…

JavaEE平台技术——MyBatis

JavaEE平台技术——MyBatis 1. 对象关系映射框架——Hibernate、MyBatis2. 对象关系模型映射3. MyBatis的实现机制4. MyBatis的XML定义5. Spring事务 在观看这个之前&#xff0c;大家请查阅前序内容。 &#x1f600;JavaEE的渊源 &#x1f600;&#x1f600;JavaEE平台技术——…

个性化联邦学习-综述

介绍阅读的三篇个性化联邦学习的经典综述文章 Three Approaches for Personalization with Applications to Federated Learning 论文地址 文章的主要内容 介绍了用户聚类&#xff0c;数据插值&#xff0c;模型插值三种个性化联邦学习的方法。 用户聚类&#xff1a; 目的&a…

智能井盖传感器助力建设数字化城市

在推进智慧城市建设过程中&#xff0c;多个城市在打造数字化管理平台&#xff0c;因为通过科学技术手段统一管理城市各种基础设施建设&#xff0c;可以缓解传统人工不足所导致的执法困难的问题&#xff0c;而且可以方便城市管理及时响应。在智慧城市建设过程中&#xff0c;城市…

【慢SQL性能优化】 一条SQL的生命周期 | 京东物流技术团队

一、 一条简单SQL在MySQL执行过程 一张简单的图说明下&#xff0c;MySQL架构有哪些组件和组建间关系&#xff0c;接下来给大家用SQL语句分析 例如如下SQL语句 SELECT department_id FROM employee WHERE name Lucy AND age > 18 GROUP BY department_id其中name为索引&a…