一、引入
在大数据场景中,开发者追求高效与灵活,Linux 系统以其稳定性成为众多组件的首选,但在资源有限的情况下,在本机搭建一个 Linux 虚拟机集群却显得过于笨重,启动、运行占资源,需要配置网络,无法和windows共享资源,尤其是对只有 8GB 内存的 Windows 系统用户来说,内存压力显而易见。
尽管如此,也挡不住一个学习者的“热情”。
本文介绍怎么在 Windows 本地搭建 Flink 环境,注意这是一个伪分布式的测试环境。
二、下载安装
下载链接:Apache 分发网站、国内清华镜像网站。
Apache 分发网站支持下载各个版本的 Flink,不过下载速度慢,清华镜像网站速度快,但是只支持最近几个大版本下载,当前是支持 1.16 至 1.19 版本的下载。
下载安装包,比如我下载的是 1.13.1 版本的,只能到 Apache 分发网站下载安装包“flink-1.13.1-bin-scala_2.11.tgz”,下载完解压就行了。
我把解压好的文件放到 D 盘下,具体路径为:“D:\flink-1.13.1”,bin 目录是“D:\flink-1.13.1\bin”。
三、配置
为了方便访问,可以加下环境变量,不加也可以,每次切换路径到安装路径下执行文件也行。
3.1 如何添加环境变量?
右键此电脑,选择属性>弹窗中单击高级系统设置>弹窗中单击环境变量。
然后在用户变量新建一个“FLINK_HOME”变量,赋值你的安装路径。
最后在 path 中添加两个变量值:
%FLINK_HOME%\bin
%FLINK_HOME%
%FLINK_CONF_DIR%
3.2 新建 bat 文件
添加了环境变了之后,可以直接在命令行输入flink -v
查看版本,不过有一个前提,必须先配置下“flink.bat”文件,在“bin”目录下只有“flink.sh”文件,该文件适用于 Linux 系统,在 Windows 系统执行会报错。【具体查看问题:找不到主类。】
早期版本有“bat”文件,但是后面的版本都没有了,不过直接通过网络搜素,可以获取到相关的文件(毕竟Windows用户还是多~)。
通过网友提供两份文件,如下:
- start-cluster.bat:启动 flink 集群
- flink.bat:执行 flink 文件
::###############################################################################
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
:: regarding copyright ownership. The ASF licenses this file
:: to you under the Apache License, Version 2.0 (the
:: "License"); you may not use this file except in compliance
:: with the License. You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################@echo off
setlocal EnableDelayedExpansionSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\logSET JVM_ARGS=-Xms1024m -Xmx1024mSET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties":: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do (
SET /A y = %%x+1
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!" 2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!" 2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0" 2> nul
RENAME "%logname_tm%" "%logname_tm%.0" 2> nul
RENAME "%outname_jm%" "%outname_jm%.0" 2> nul
RENAME "%outname_tm%" "%outname_tm%.0" 2> nul
DEL "%logname_jm%.6" 2> nul
DEL "%logname_tm%.6" 2> nul
DEL "%outname_jm%.6" 2> nul
DEL "%outname_tm%.6" 2> nulfor %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (echo java.exe was not found in PATH variablegoto :eof
)echo Starting a local cluster with one JobManager process and one TaskManager process.echo You can terminate the processes via CTRL-C in the spawned shell windows.echo Web interface by default on http://localhost:8081/.start /b java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start /b java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1endlocal
::###############################################################################
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
:: regarding copyright ownership. The ASF licenses this file
:: to you under the Apache License, Version 2.0 (the
:: "License"); you may not use this file except in compliance
:: with the License. You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################@echo off
setlocalSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\pluginsSET JVM_ARGS=-Xmx512mSET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*endlocal
将这两份文件添加到 Flink 安装目录下的“bin”目录即可,我的配置参考如下:
“start-cluster.bat”关键语法解析,该文件可以自动获取路径信息,不需要设置环境变量也可执行。(当然,配置环境变量是使得可以在任意位置调用上面这两个文件,开启服务和跑 Flink 代码。)set bin=%~dp0
:将当前批处理文件所在目录的完整路径(不包含文件名)赋值给了 bin 环境变量。
- set:这是用于设置或修改环境变量的命令。
- bin=:这里是在定义一个名为 bin 的环境变量,并准备给它赋值。
- %~dp0:这是批处理中的特殊语法,用于获取调用脚本的路径。具体来说:
- %0 表示当前批处理文件的完整路径和文件名。
- ~d 表示从 %0 中提取驱动器盘符(例如:d:)。
- ~p 表示从 %0 中提取路径(例如:\flink-1.13.1\bin\)。
- dp 结合起来使用时,就表示获取 %0 的驱动器盘符和路径,但不包括文件名。
四、测试
4.1 查看 Flink 版本
配置好之后,打开 CMD,输入flink -v
,可以看到显示了 Flink 的版本信息,可以开始跑代码了。
PS:这里有一个报错,说找不到 Log4j 2 的配置文件。【具体查看问题:找不到 Log4j 2】
4.2 启动 Flink 服务
打开 CMD,输入以下命令,启动服务
start-cluster
# 或者
start-cluster.bat
然后输入jps
可以查看 java 进程,出现“TaskManagerRunner”和“StandaloneSessionClusterEntrypoint”则是开启 Flink 服务成功,可以打开“http://localhost:8081/”查看 UI 界面。
注意点1:**可能“TaskManagerRunner”会无法开启,参考【问题:TaskManagerRunner 无法启动】。**如果输入start-cluster
之后,马上输入jps
可以看到“TaskManagerRunner”,但是过一会再输入jps
就会发现没了,这就是启动不成功。
注意点2:如果打开的是 PowerShell 终端,可能会无法执行,可参考【问题:无法在 PowerShell 执行 bat 脚本】。
4.3 执行 Flink 代码
安装包中自带了一些测试 jar 包,以WordCount.jar
为例来测试下。
直接执行以下命令即可,官方有默认给了一个