PyFlink使用教程,Flink,Python,Java

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开 Anaconda3 Prompt

> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)> python --version
Python 3.8.8> conda env list
# conda environments:
#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4> conda create -n PyFlink-1.17.1 python==3.8.8> conda activate PyFlink-1.17.1> python -m pip install apache-flink==1.17.1> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout,level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

要在PyFlink-1.17.1环境下运行

> python tableAPIJob.pyUsing Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre> vi /etc/profileexport JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH> source /etc/profile > ls -l /usr/bin/python*lrwxrwxrwx 1 root root       9 Mar 26  2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 26  2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  4  2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  4  2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  2  2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 22  2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  3  2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  3  2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 26  2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 26  2019 /usr/bin/python3m-config -> python3.7m-config> python --versionCommand 'python' not found, but can be installed with:apt install python3         # version 3.7.3-1, or
apt install python          # version 2.7.16-1
apt install python-minimal  # version 2.7.16-1You also have python3 installed, you can run 'python3' instead.> python3 --version
Python 3.7.3# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python> python --version
Python 3.7.3# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple# 启动 Flink-1.17.1
> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink 库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的.wait()调用删掉

tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.pyExecuting word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656205.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx 1.25配置QUIC和HTTP3

Nginx 1.25配置QUIC和HTTP/3 Nginx在编译时需要配置相应的SSL库,以确保能够支持HTTP3.0和HTTP2.0等基于HTTPS的协议。这些加密算法主要由OpenSSL提供。另外,BoringSSL是谷歌创建的OpenSSL分支,专门用于支持TLS 1.3的UDP协议的0-RTT数据传输加…

2024 年人工智能(AI)会有哪些新趋势和新突破?无际Ai分享

随着科技的不断发展,人工智能领域正在以惊人的速度迈向前所未有的高度。在 2024 年,我们可以期待看到多个领域的重大突破,这将为人工智能技术带来新的应用和可能性。 以下是一些可能出现的新领域和突破性进展: 强化学习应用拓展&…

MySQL前百分之N问题--percent_rank()函数

PERCENT_RANK()函数 PERCENT_RANK()函数用于将每行按照(rank - 1) / (rows - 1)进行计算,用以求MySQL中前百分之N问题。其中,rank为RANK()函数产生的序号,rows为当前窗口的记录总行数 PERCENT_RANK()函数返回介于 0 和 1 之间的小数值 selectstudent_…

MicrosoftEdge浏览器打开网页出现“此网站被人举报不安全”问题时解决办法

1:有时候不知怎么回事用电脑自带的微软浏览器进行搜索会出现以下的问题 这可能是由于我们的浏览器安全审查过于严格引起的 Windows10正式版系统下,使用Edge浏览器浏览网页时候,发现整个页面突然变成了红色,显示“已有人举报此网站…

【高效开发工具系列】markdown转HTML

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

打击者H5小游戏

欢迎来到程序小院 打击者 玩法&#xff1a;点击飞机上下左右移动躲过子弹射击&#xff0c;打掉上方敌人飞机&#xff0c; 遇到药包会增加能量&#xff0c;弹药包会升级武器&#xff0c;快去射击吧^^。开始游戏https://www.ormcc.com/play/gameStart/262 html <div id"…

java8 Duration类学习

Duration类 官网地址 基于时间的时间量&#xff0c;例如“34.5秒”。 此类以秒和纳秒为单位对时间的量或量进行建模。它可以使用其他基于持续时间的单位访问&#xff0c;如分钟和小时。此外&#xff0c;可以使用DAYS单位&#xff0c;并将其视为完全等于24小时&#xff0c;从…

C语言第十三弹---VS使用调试技巧

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 VS调试技巧 1、什么是bug 2、什么是调试&#xff08;debug&#xff09;&#xff1f; 3、Debug和Release​编辑​ 4、VS调试快捷键 4.1、环境准备 4.2、调试…

计算机毕业设计 | SpringBoot+vue学生成绩管理系统教务管理系统

1&#xff0c;项目背景 随着我国高等教育的发展&#xff0c;数字化校园将成为一种必然的趋势&#xff0c;国内高校迫切需要提高教育工作的质量与效率&#xff0c;学生成绩管理工作是高校信息管理工作的重要组成部分&#xff0c;与国外高校不同&#xff0c;他们一般具有较大规模…

快来建服组队一起捕捉帕鲁

2024年初最火的steam游戏《幻兽帕鲁》&#xff0c;大家都已经玩上了吧&#xff1f; 如何跟朋友组队一起在广阔的世界中捕捉神奇的生物“帕鲁”&#xff0c;快来金山云解锁吧~ 第一步&#xff1a;创建游戏服务器 部署一台幻兽帕鲁云服务器&#xff1a;在控制台上选择离您更近…

在Windows11的WSL上运行Llama2-7b-chat 下

上一篇博客讲了我跑Llama的demo的心路历程&#xff08;上一篇博客传送门&#xff09;&#xff0c;这篇我们主要是讲下怎么配置。 快速开始 使用Linux、Linux、Linux&#xff0c;重要的事情说三遍&#xff0c;如果你和我一样懒得安装双系统&#xff0c;那么在Windows下安装一个…

二百二十一、HiveSQL报错:return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

一、目的 在运行HiveSQL时&#xff0c;执行报错 tatement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask 二、在yarn上查看任务报错 The required MAP capability is more than the supported max container capability in t…

使用流服务器m7s对接gb28181

优:sip品牌兼容性比较好,大华,海康都稳定可以,srs的5.0 sip品牌兼容性大华没反应,akstream-sip 大华也有问题,wvp也还可以 缺:目前最新的4.7.4版本,,sip协议用udp正常,TCP不正常(估计不支持),移动、事件,预警不支持 一、下载对应的m7s的执行文件 官网:快速起步…

(五)MySQL的备份及恢复

1、MySQL日志管理 在数据库保存数据时&#xff0c;有时候不可避免会出现数据丢失或者被破坏&#xff0c;这样情况下&#xff0c;我们必须保证数据的安全性和完整性&#xff0c;就需要使用日志来查看或者恢复数据了 数据库中数据丢失或被破坏可能原因&#xff1a; 误删除数据…

GitHub 开启 2FA 双重身份验证的方法

为什么要开启 2FA 自2023年3月13日起,我们登录 GitHub 都会看到一个要求 Enable 2FA 的重要提示,具体如下: GitHub users are now required to enable two-factor authentication as an additional security measure. Your activity on GitHub includes you in this requi…

vivado 硬块规划器

硬块规划器 Versal自适应SoC的硬块规划GT组件从通用/通道更新为AMD的GT_QUAD粒度Versal™ 自适应SoC。为了启用某些GT共享用例&#xff0c;对GT向导流进行了修改使用Vivado IP集成商。使用Vivado IP集成商构建使用单个或多个GT_ QUAD。连接到GT_QUAD的自定义IP的设计条目为通过…

认知篇:什么是逆转诅咒?一个提问GPT的错误姿势

本系列文章主要是分享一些关于大模型的一些学术研究或者实验性质的探索&#xff0c;为大家更新一些针对大模型的认知。所有的结论我都会附上对应的参考文献&#xff0c;有理有据&#xff0c;也希望这些内容可以对大家使用大模型的过程有一些启发。 注&#xff1a;本系列研究关注…

养猫家庭如何挑选宠物空气净化器?猫用空气净化器品牌推荐!

家里的猫咪真的太可爱了&#xff0c;但它们的毛发总是无处不在。而且猫砂盆一天不清理&#xff0c;整个屋子都会弥漫着臭味。每天打扫也很费时费力&#xff0c;虽然享受着猫咪带来的快乐&#xff0c;但也不得不面对这些困扰。 一直以来&#xff0c;我都想购买一台空气净化器&a…

宠物处方单子怎么开,宠物门诊处方管理软件教程

宠物处方单子怎么开&#xff0c;宠物门诊处方管理软件教程 一、前言 宠物店电子处方软件操作教程以 佳易王宠物店电子处方管理系统V16.0为例说明。 如图&#xff0c;在开处方的时候&#xff0c;点击导航栏菜单&#xff0c;兽医处方按钮 点击 增加新单&#xff0c;填写宠物及…

Security ❀ HTTP/HTTPS逐包解析交互过程细节

文章目录 1. TCP三次握手机制2. HTTP Request 请求报文3. HTTP Response 响应报文4. SSL/TLS协议4.1. ClientHello 客户端Hello报文4.2 ServerHello 服务器Hello报文4.3. *ServerKeyExchange 服务公钥交换4.4. ClientKeyExchange 客户端公钥交换4.5. *CertificateVerify 证书验…