Flink WordCount实践

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	<properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

</mirrors>上面一行添加阿里云仓库镜像

	<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><mirrorOf>central</mirrorOf>        </mirror>

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world
hello java
hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取DataSource<String> lineDS = env.readTextFile("data/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

运行结果

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/812447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线上问题监控 Sentry 接入全过程

背景&#xff1a; 线上偶发问题出现后 &#xff0c;测试人员仅通过接口信息无法复现错误场景&#xff1b;并且线上环境的监控&#xff0c;对于提高系统的稳定性 &#xff08;降低脱发率&#xff09; 至关重要&#xff1b;现在线上监控工具这个多&#xff0c;为什么选择Sentry?…

Java并发(1)--线程,进程,以及缓存

线程和进程是什么&#xff1f; 进程 进程是程序的一次执行过程&#xff0c;系统程序的基本单位。有自己的main方法&#xff0c;并且主要由主方法运行起来的基本上就是进程。 线程 线程与进程相似&#xff0c;但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以…

vb.net textbox滚动显示到最后一行

调用&#xff1a; Private Sub TextBox18_TextChanged(sender As Object, e As System.EventArgs) Handles TextBox18.TextChanged show_textbox_endline(TextBox18) End Sub 函数&#xff1a; 显示textbox最后一行 Public Sub show_textbox_endline(Tbx As TextB…

MVCC(解决MySql中的并发事务的隔离性)

MVCC 如何保证事务的隔离性&#xff1f; 1.排他锁&#xff1a;如一个事务获取了一个数据行的排他锁&#xff0c;其他事务就不能再获取改行的其他锁。 2.MVCC&#xff1a;多版本并发控制。 MVCC&#xff1a; 1.隐藏字段 1.DB_TRX_ID&#xff1a;最近修改事务的id。默认值从0开…

Selenium自动填写验证码(偏小白版本OCR)

OCR基础示例 我直接 上代码 from PIL import Image import pytesseract# 0 Orientation and script detection (OSD) only. # 1 Automatic page segmentation with OSD. # 2 Automatic page segmentation, but no OSD, or OCR. # 3 Fully automatic page segmentation, but n…

【MYSQL】索引机制概述

由于MySQL是作为存储层部署在业务系统的最后端&#xff0c;所有的业务数据最终都要入库落盘&#xff0c;但随着一个项目在线上运行的时间越来越久&#xff0c;数据库中的数据量自然会越来越多&#xff0c;而数据体积出现增长后&#xff0c;当需要从表查询一些数据时&#xff0c…

symfony框架

Symfony框架是一种流行的PHP框架&#xff0c;用于快速开发高质量的Web应用程序。它是一个开源框架&#xff0c;遵循MVC&#xff08;模型-视图-控制器&#xff09;设计模式&#xff0c;提供了一套强大的工具和组件&#xff0c;帮助开发人员更轻松地构建复杂的Web应用程序。 Sym…

Apache Storm的详细配置

Apache Storm的详细配置主要涉及以下几个方面: Zookeeper配置:Apache Storm使用Zookeeper来进行协调和配置管理。你需要配置Zookeeper集群的连接信息,包括Zookeeper服务器的主机和端口。 Storm Nimbus配置:Nimbus是Storm的主节点,负责分配任务给各个工作节点。你需要配置N…

javaScript设计模式之简单工厂模式

简单工厂模式(Simple Factory):又叫静态工厂方法&#xff0c;由一个工厂对象决定创建某一种产品对象类的实例。主要用来创建同一类对象。 场景一 假设我们需要计算圆形和矩形的面积 function Circle(radius) {this.radius radius;}Circle.prototype.getArea function() {re…

C++猫和老鼠有多重(友元函数初步)

定义猫和老鼠&#xff1a;Cat与Mouse两个类&#xff0c;二者都有weight属性&#xff0c;定义二者的一个友元函数totalweight()&#xff0c;计算二者的重量和。 裁判测试程序样例&#xff1a; #include <iostream> using namespace std;/* 请在这里填写答案 */int main(…

第六周学习笔记DAY.4-方法与方法重载

如何创建和使用对象 创建对象 类名 对象名 new 类名(); 引用对象成员&#xff1a;使用“.”进行以下操作 引用类的属性&#xff1a;对象名.属性 用类的方法&#xff1a;对象名.方法名() 学完本次课程后&#xff0c;要求能够&#xff1a; 方法的参数传递 会使用构造方法…

总结SQL相对常用的几个字符函数

目录 字符的截取 substr() trim()、ltrim()、rtrim() 字符串的拼接 ||、 字符的大小写转换 upper(column_name):大写 lower(column_name):小写 字符替换 replace() 搜索字符 instr(column_name, substring_to_find,start,n_appearence) charindex(substring_to_fi…

【问题解决】ubuntu安装新版vscode报code-insiders相关错误

问题 目前 vscode官网 最新的包为 insiders_1.89.0-1712297812_amd64.deb &#xff0c;双击或者使用sudo dpkg -i code-insiders_1.89.0-1712297812_amd64.deb安装后报错&#xff0c;执行其他命令也报错。 安装环境&#xff1a;ubuntu18.04 dpkg: 处理软件包 code-insiders (…

火绒安全软件:程序员的网络守护天使

目录 前言 系统防护 网络防护 隐私保护 高级设置 软件安全 响应速度 持续更新 总结 前言 在这个充满机遇与挑战的数字时代&#xff0c;程序员们如同探险家&#xff0c;不断探索着代码的新大陆。然而&#xff0c;网络世界也充斥着各种未知的风险和威胁。火绒安全软件&a…

riscv-gnu-toolchain 交叉编译器如何构建?

安装依赖工具 sudo apt-get install git autoconf automake autotools-dev curl python3 libmpc-dev libmpfr-dev libgmp-dev gawk build-essential bison flex texinfo gperf patchutils bc libexpat-dev libglib2.0-dev ninja-build zlib1g-dev pkg-config libboost-all-dev…

基于java+springboot+vue实现的药品管理系统(文末源码+Lw)23-297

摘 要 传统信息的管理大部分依赖于管理人员的手工登记与管理&#xff0c;然而&#xff0c;随着近些年信息技术的迅猛发展&#xff0c;让许多比较老套的信息管理模式进行了更新迭代&#xff0c;药品信息因为其管理内容繁杂&#xff0c;管理数量繁多导致手工进行处理不能满足广…

Linux命令-dpkg-query命令(Debian Linux中软件包的查询工具)

说明 dpkg-query命令 是Debian Linux中软件包的查询工具&#xff0c;它从dpkg软件包数据库中查询并辨识软件包的信息。 语法 dpkg-query(选项)(参数)选项 -l&#xff1a;列出符合匹配模式的软件包&#xff1b; -s&#xff1a;查询软件包的状态信息&#xff1b; -L&#xff1…

llama-factory SFT系列教程 (二),大模型在自定义数据集 lora 训练与部署

文章目录 简介支持的模型列表2. 添加自定义数据集3. lora 微调4. 大模型 lora 权重&#xff0c;部署问题 参考资料 简介 llama-factory SFT系列教程 (一)&#xff0c;大模型 API 部署与使用本文为 llama-factory SFT系列教程的第二篇&#xff1b; 支持的模型列表 模型名模型…

Composer安装与配置

Composer&#xff0c;作为PHP的依赖管理工具&#xff0c;极大地简化了PHP项目中第三方库的安装、更新与管理过程。本文将详细介绍Composer的安装步骤、基本配置方法&#xff0c;以及一些实用的操作示例&#xff0c;帮助读者快速上手并熟练运用Composer。 一、Composer安装 环…

C++内存分布

C代码编译过程 预处理 宏定义展开、头文件展开、条件编译&#xff0c;这里并不会检查语法编译检查语法&#xff0c;将预处理后文件编译生成汇编文件汇编将汇编文件生成目标文件(二进制文件)链接将目标文件链接为可执行程序 进程的内存分布 程序运行起来(没有结束前)就是一个…