【Flink入门修炼】1-3 Flink WordCount 入门实现

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
image.png

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {//参数检查if (args.length != 2) {// System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");// return;args = new String[]{"127.0.0.1", "9000"};}String hostname = args[0];Integer port = Integer.parseInt(args[1]);// 创建 streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据DataStreamSource<String> stream = env.socketTextStream(hostname, port);// 计数SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);sum.print();env.execute("Java WordCount from SocketTextStream Example");}public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {String[] tokens = s.toLowerCase().split("\\W+");for (String token: tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
image.png

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormatat com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormatat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:419)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:352)... 1 more

在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上:
image.png

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

wordfrequency
hello1
world1
flink1
flink1
flink1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency>

程序如下:

public class SQLWordCount {public static void main(String[] args) throws Exception {// 创建上下文环境ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);// 读取一行模拟数据作为输入String words = "hello world flink flink flink";String[] split = words.split("\\W+");ArrayList<WC> list = new ArrayList<>();for (String word : split) {WC wc = new WC(word, 1);list.add(wc);}DataSource<WC> input = fbEnv.fromCollection(list);// DataSet 转 SQL,指定字段名Table table = fbTableEnv.fromDataSet(input, "word,frequency");table.printSchema();// 注册为一个表fbTableEnv.createTemporaryView("WordCount", table);Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);ds1.printToErr();}public static class WC {public String word;public long frequency;public WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return  word + ", " + frequency;}}
}

执行,结果输出:

(`word` STRING,`frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/674518.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA创建Java类时自动添加注释(作者、年份、月份)

目录 IDEA创建Java类时自动添加注释&#xff08;作者、年份、月份&#xff09;如图&#xff1a; IDEA创建Java类时自动添加注释&#xff08;作者、年份、月份&#xff09; 简单记录下&#xff0c;IDEA创建Java类时自动添加注释&#xff08;作者、年份、月份&#xff09;&#…

Java+微信小程序实现智慧家政系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询家政服务4.2 新增单条服务订单4.3 新增留言反馈4.4 小程序登录4.5 小程序数据展示 五、免责说明 一、摘要 1.1 项目介绍 基于微信小程序JAVAVueSpringBootMySQL的智慧家政系统&#xff0…

作业2.8

1、选择题 1.1、以下选项中,不能作为合法常量的是 ____B______ A&#xff09;1.234e04 B&#xff09;1.234e0.4 C&#xff09;1.234e4 D&#xff09;1.234e0 1.2、以下定义变量并初始化错误的是_____D________。 A) char c1 ‘H’ &#xff1b; B) char c…

RabbitMQ高可用架构涉及常用功能整理

RabbitMQ高可用架构涉及常用功能整理 1. rabbitmq的集群模式2. 镜像模式高可用系统架构和相关组件3. rabbitmq的核心参数3.1 镜像策略3.2 新镜像同步策略3.3 从节点晋升策略3.4 主队列选择策略 4. rabbitmq常用命令4.1 常用基础命令4.1.1 服务管理4.1.2 用户管理4.1.3 角色管理…

数字图像处理(实践篇)四十七 OpenCV-Python 高动态范围HDR

目录 一 HDR 二 实践 高质量的图像具备的要素如下: ①分辨率 图像中的像素数量。在特定屏幕尺寸下,分辨率越高,像素越多,显示的细节更精细。 ②位深度

JavaEE作业-实验三

目录 1 实验内容 2 实验要求 3 思路 4 核心代码 5 实验结果 1 实验内容 简单的线上图书交易系统的web层 2 实验要求 ①采用SpringMVC框架&#xff0c;采用REST风格 ②要求具有如下功能&#xff1a;商品分类、订单、购物车、库存 ③独立完成&#xff0c;编写实验报告 …

自然语言处理(NLP)——使用Rasa创建聊天机器人

1 基本概念 1.1 自然语言处理的分类 IR-BOT&#xff1a;检索型问答系统 Task-bot&#xff1a;任务型对话系统 Chitchat-bot:闲聊系统 1.2 任务型对话Task-Bot:task-oriented bot 这张图展示了一个语音对话系统&#xff08;或聊天机器人&#xff09;的基本组成部分和它们之间的…

外贸邮件群发如何做?外贸邮件群发靠谱吗?

外贸邮件群发有哪些平台&#xff1f;外贸群发邮件用什么邮箱&#xff1f; 外贸邮件群发是许多企业在开展国际贸易时常用的营销手段&#xff0c;它不仅能够快速地将产品信息和促销活动传达给目标客户&#xff0c;还能够有效地建立和维护客户关系。下面&#xff0c;就让蜂邮探讨…

跟着pink老师前端入门教程-day21+22

5.4 常见flex布局思路 5.5 背景线性渐变 语法&#xff1a; background: linear-gradient( 起始方向 , 颜色 1, 颜色 2, ...); background: -webkit-linear-gradient(left, red , blue); background: -webkit-linear-gradient(left top, red , blue); 背景渐变必须添加浏览…

e5 服务器具备哪些性能特点?

随着云计算和大数据技术的不断发展&#xff0c;服务器作为数据中心的核心设备&#xff0c;其性能特点也日益受到关注。其中&#xff0c;E5服务器作为当前主流的服务器类型之一&#xff0c;具备许多优秀的性能特点。本文将详细介绍E5服务器的性能特点&#xff0c;帮助读者更好地…

CTFshow web(php命令执行 37-40)

?ceval($_GET[shy]);&shypassthru(cat flag.php); #逃逸过滤 ?cinclude%09$_GET[shy]?>&shyphp://filter/readconvert.base64-encode/resourceflag.php #文件包含 ?cinclude%0a$_GET[cmd]?>&cmdphp://filter/readconvert.base64-encode/…

【我与Java的成长记】之String类详解

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 Java笔记传送门 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、字符串构…

【http】2、http request header Origin 属性、跨域 CORS、同源、nginx 反向代理、预检请求

文章目录 一、Origin 含义二、跨源资源共享&#xff1a;**Cross-Origin Resource Sharing** CORS2.1 跨域的定义2.2 功能概述2.3 场景示例2.3.1 简单请求2.3.2 Preflighted requests&#xff1a;预检请求 2.4 header2.4.1 http request header2.4.1.1 Origin2.4.1.2 Access-Con…

立面效果图为何要用云渲染100?渲染100邀请码1a12

建筑设计是一门艺术&#xff0c;而立面效果图是艺术的展现&#xff0c;它在设计中非常重要。 1、立面效果图的重要性 立面效果图能用来展示建筑物的风格、材质、色彩以及环境等因素&#xff0c;通过它&#xff0c;设计师可以检验项目质量&#xff0c;评估效果是否达到预期&…

(2024,低比特模型量化,模型微调,QuEST,TAQuant)QuEST:通过高效选择性微调进行低比特扩散模型量化

QuEST: Low-bit Diffusion Model Quantization via Efficient Selective Finetuning 公和众和号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 2. 相关工作 3. 方法 3.1. 预备知识 3…

ELAdmin 的 CRUD

数据表结构 弄个测试的数据表&#xff0c;不同类型的几个字段&#xff0c;表名位 mp_reply。 生成代码 ELAdmin 可以自动生成代码。 左侧目录系统工具–代码生成&#xff0c;点开以后可以看到上面创建的数据表mp_reply&#xff0c;点击配置。 进入的页面内容有两部分&#…

DePIN+GameFi+顶级电竞团队,GAIMIN如何颠覆Web3游戏赛道

DePIN带动互联网进入去中心化时代 自从智能合约和去中心化应用DApp普及之后&#xff0c;越来越多的从业者开始将目光放在区块链对互联网的升级和改造之上&#xff0c;这里主要进行了三类&#xff0c;第一类是通过节点的去中心化来实现的&#xff0c;这样确保了基础设施的分散&a…

Zoho Mail企业邮箱商业扩展第1部分:入门

今天让我们来认识一下王雪琳&#xff0c;她是一位独立经营的营销咨询机构的个体企业家。在开始自己的事业之前&#xff0c;她进行了广泛的市场调研&#xff0c;明确了自己的业务定位&#xff0c;并全力以赴地投入到了自己的企业中。 一、创业背景 王雪琳的营销业务主要集中在…

论 Scratch 版“愤怒的小鸟”的资源(10000 余块代码)

资源链接 “愤怒的小鸟”资源&#xff1a;https://download.csdn.net/download/leyang0910/88820527 游戏 SJA 分析及&#xff1a;角色数量&#xff1a;12&#xff0c;素材数量&#xff1a;214&#xff0c;积木数量&#xff1a;1442&#xff0c;音频数量&#xff1a;11 “愤怒…

【分布式】雪花算法学习笔记

雪花算法学习笔记 来源 https://pdai.tech/md/algorithm/alg-domain-id-snowflake.html概述 雪花算法是推特开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将64位分割成多个部分&#xff0c;每一个部分代表不同的含义&#xff0c;这种就是将64位划分成不同的段&…