pyflink 环境测试以及测试案例

1. py 的 环境以来采用Anaconda环境包

安装版本:https://www.anaconda.com/distribution/#download-section
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
下载地址
https://repo.anaconda.com/archive/

2. 安装

bash Anaconda3-2021.05-Linux-x86_64.sh

2.1 如图

在这里插入图片描述

在这里插入图片描述

3. 配置配置anaconda的环境变量:

vim /etc/profile
##增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
重新加载环境变量: source /etc/profile

4. 修改bashrc文件

sudo vim ~/.bashrc
添加如下内容:
export PATH=~/anaconda3/bin:$PATH

说明:

profile
其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
bashrc
bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

5. 启动anaconda并测试

注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
如图:
在这里插入图片描述

如果没有可以重启服务器。

如果大家发现命令行最前面出现了 (base) 信息, 可以通过以下方式, 退出Base环境

vim ~/.bashrc
拉到文件的最后面: 输入 i 进入插入模式
将以下内容添加:
conda deactivate

6. Anaconda相关组件命令

地址:https://www.continuum.io/downloads

安装包:pip install xxx,conda install xxx
卸载包:pip uninstall xxx,conda uninstall xxx
升级包:pip install upgrade xxx,conda update xxx

6.1 功能:

Anaconda自带,无需单独安装
实时查看运行过程
基本的web编辑器(本地)
ipynb 文件分享
可交互式
记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。

IPython:

命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法

7. Anaconda中的conda命令做详细介绍和配置。

** 7.1. conda命令及pip命令**

conda install  包名    pip install 包名
conda uninstall 包名   pip uninstall 包名
conda install -U 包名   pip install -U 包名

7.2 Anaconda设置为国内下载镜像

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

7.3 conda创建虚拟环境

conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境#现在使用以下命令激活新创建的环境:
source activate py_env   
# 或者
conda activate py_env   
deactivate py_env #退出环境

----------------------------------------------- Pyflink 环境安装-------------------------------------

8. pyflink 环境安装

激活虚拟环境

source ~/Documents/install/miniconda/bin/activate

创建 pyflink 虚拟环境

conda create --name py310_pyflink171_venv -y -q python=3.10.8
conda activate py310_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.17.1 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

或者 flink 14.5

# 创建 pyflink 虚拟环境
conda create --name py314_pyflink171_venv -y -q python=3.8.8
conda activate py314_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

如果不想进入虚拟环境 在执行 就直接 在 服务器最外面执行:

pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

9. 官网测试例子

地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html

vi word_count2.py

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import argparse
import logging
import sysfrom pyflink.table import TableEnvironment, EnvironmentSettings, TableDescriptor, Schema,\DataTypes, FormatDescriptor
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udfwords = ["flink", "window", "timer", "event_time", "processing_time", "state","connector", "pyflink", "checkpoint", "watermark", "sideoutput", "sql","datastream", "broadcast", "asyncio", "catalog", "batch", "streaming"]max_word_id = len(words) - 1def streaming_word_count(output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# define the source# randomly select 5 words per second from a predefined listt_env.create_temporary_table('source',TableDescriptor.for_connector('datagen').schema(Schema.new_builder().column('word_id', DataTypes.INT()).build()).option('fields.word_id.kind', 'random').option('fields.word_id.min', '0').option('fields.word_id.max', str(max_word_id)).option('rows-per-second', '5').build())tab = t_env.from_path('source')# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udf(result_type=DataTypes.STRING())def id_to_word(word_id):return words[word_id]# compute word counttab.select(id_to_word(col('word_id'))).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)streaming_word_count(known_args.output)

10.执行命令

 python word_count2.py

11. 执行结果

在这里插入图片描述

12 实例2

12.1 安装mysql docker 安装
https://blog.csdn.net/wudonglianga/article/details/133927305
12.2 创建表语句

CREATE TABLE `bigdatauser` (`id` int(32) NOT NULL AUTO_INCREMENT,`name` varchar(64) DEFAULT NULL,`age` int(32) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4

12.3 python 脚本

from pyflink.table import EnvironmentSettings, TableEnvironmentprint('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar;file:/opt/flink/lib/mysql-connector-java-5.1.49.jar")
print('step 03')# 2. create source Table=
table_env.execute_sql("""CREATE TABLE products (id INT,name STRING,age INT,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc',  'url' = 'jdbc:mysql://192.168.43.185:3306/wudl','username'='root','password'='123456','table-name' = 'bigdatauser')
""")# 3. create sink Table
table_env.execute_sql("""CREATE TABLE dproducts (id int,name STRING,age int) WITH ('connector' = 'print')
""")print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.age FROM products AS p").wait()
print('step 06')

12.4 执行结果
在这里插入图片描述

13. 任务提交服务器运行

打包运行环境

# 找到 minconda(安装路径 envs目录下) 或者对应虚拟环境安装目录
# 打包 py310_pyflink171_venv 虚拟环境
cd ~/Documents/install/miniconda/env
zip -r py310_pyflink171_venv.zip py310_pyflink171_venv

** 1.提交至 jobmanager**

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py /workplace/src/word_count.py

2. 带目录,指定入口模块提交

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs /workplace/src \
-pym word_count

3. 提交至 yarn 集群管理
提交运行
3.1.本地 py虚拟环境

./flink run -m yarn-cluster \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py word_count.py

3.2. hdfs py虚拟环境

./flink run  -m yarn-cluster \
-pyarch hdfs://dae-ns/py_env/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv \
-py word_count.py

3.3.带目录 src

./bin/flink run-application -t yarn-application \
-Dyarn.application.name=wordcount \
-Dyarn.ship-files=/workplace/src \
-pyarch shipfiles/py310_pyflink171_venv.zip \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs src \
-pym word_count

注意:

虚拟环境打包,该虚拟环境创建方式建议使用 conda,或者virtualenv --always-copy 方式创建,这样打的虚拟环境更全
提交虚拟环境地址:py310_pyflink171_venv.zip/py310_pyflink171_venv 注意这个地址是双层

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于深度学习网络的蔬菜水果种类识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1数据集准备 4.2构建深度学习模型 4.3模型训练 4.4模型评估 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 clc; clear; close all; wa…

java基础面试题

java后端面试题大全 1.java基础1.1 java中和equals的区别1.2 String、StringBuffer、StringBuilder的区别1.3 intern方法的作用及原理1.4 String不可变的含义1.5 static用法、使用位置、实例1.6 为什么静态方法不能调用非静态方法和变量1.7 异常/Exception1.7 try/catch/finall…

【CNN-LSTM预测】基于卷积神经网络-长短期记忆网络的数据分类预测研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

头脑风暴之约瑟夫环问题

一 问题的引入 约瑟夫问题的源头完全可以命名为“自杀游戏”。本着和谐友爱和追求本质的目的,可以把问题描述如下: 现有n个人围成一桌坐下,编号从1到n,从编号为1的人开始报数。报数也从1开始,报到m人离席&#xff0c…

YOLOv5项目实战(1)— 如何去训练模型

前言:Hello大家好,我是小哥谈。YOLOv5基础知识入门系列、YOLOv5源码中的参数超详细解析系列、YOLOv5入门实践系列、YOLOv5论文作图教程系列和YOLOv5算法改进系列学习完成之后,接着就进入YOLOv5项目实战系列了。🎉为了让大家能够牢固地掌握YOLOv5算法,本系列文章就通过一个…

计算机算法分析与设计(18)---回溯法(介绍、子集和问题C++代码)

文章目录 一、回溯法介绍二、子集和问题2.1 知识概述2.2 代码编写 一、回溯法介绍 1. 回溯法(back tracking)是一种选优搜索法,又称为试探法,有“通用的解题法”之称,按选优条件向前搜索,以达到目标。但当探…

AIGC笔记--基于DDPM实现图片生成

目录 1--扩散模型 2--训练过程 3--损失函数 4--生成过程 5--参考 1--扩散模型 完整代码:ljf69/DDPM 扩散模型包含两个过程,前向扩散过程和反向生成过程。 前向扩散过程对一张图像逐渐添加高斯噪声,直至图像变为随机噪声。 反向生成过程…

C语言求 3*3 矩阵对角线之和

完整代码&#xff1a; // 求 3*3 矩阵对角线之和 #include<stdio.h>int main() {int n3;int arr[3][3];// 输入矩阵printf("请输入矩阵的元素:\n");for (int i 0; i < n; i){for (int j 0; j < n; j){scanf("%d", &arr[i][j]);}}int su…

Python+requests+pytest+excel+allure 接口自动化测试实战

定义&#xff1a; Unittest是Python标准库中自带的单元测试框架&#xff0c;Unittest有时候也被称为PyUnit&#xff0c;就像JUnit是Java语言的标准单元测试框架一样&#xff0c;Unittest则是Python语言的标准单元测试框架。 Pytest是Python的另一个第三方单元测试库。它的目的…

day03_pandas_demo

文章目录 pandas介绍为什么使用pandasDataFrameDataFrame属性DataFrame的索引修改行列的索引值重设索引值以某列设置新索引 MultiIndexSerias索引操作直接索引按名字索引按数值索引 赋值操作排序对内容排序按索引排序 DataFrame的运算算术运算逻辑运算逻辑运算符号 < > |…

代码随想录 Day26 贪心 01 全集 LeetCode455 分发饼干 LeetCodeT346摆动序列 LeetCdoe T53 最大子数组和

前言:贪心无套路 本质: 局部最优去推导全局最优 两个极端 贪心算法的难度一般要么特别简单,要么特别困难,所以我们只能多见识多做题,记住无需数学证明,因为两道贪心基本上毫无关系,我们只需要去思考局部最优即可 贪心的小例子 比如有一堆钞票&#xff0c;你可以拿走十张&#x…

SpringBoot AOP + Redis 延时双删功能实战

一、业务场景 在多线程并发情况下&#xff0c;假设有两个数据库修改请求&#xff0c;为保证数据库与redis的数据一致性&#xff0c;修改请求的实现中需要修改数据库后&#xff0c;级联修改Redis中的数据。 请求一&#xff1a;A修改数据库数据 B修改Redis数据 请求二&#xff…

谷歌真的不喜欢 Node.js ?

有人在 Quora 上提问&#xff0c;为什么谷歌不喜欢 Node.js 呢&#xff0c;Google 的 UX 工程师和来自 Node.js 团队的开发者分别回答了他们对这个问题的看法&#xff0c;对于编程语言来说&#xff0c;每一门语言都有它自己的优势&#xff0c;重要的是如何用它去解决问题。 谷…

驱动开发LED灯绑定设备文件

头文件 #ifndef __HEAD_H__ #define __HEAD_H__typedef struct {unsigned int MODER;unsigned int OTYPER;unsigned int OSPEEDR;unsigned int PUPDR;unsigned int IDR;unsigned int ODR; }gpio_t;#define PHY_LED1_ADDR 0x50006000 #define PHY_LED2_ADDR 0x50007000 #defin…

【软考-中级】系统集成项目管理工程师-项目收尾管理历年案例

持续更新。。。。。。。。。。。。。。。 目录 2017 下 试题三(17分)背诵整理1. 项目总结会议一般讨论的内容2. 系统文档验收所涉及的文档都有哪些 系列文章 2017 下 试题三(17分) 阅读下列说明&#xff0c;回答问题 1至问题 4&#xff0c;将解答填入答题纸的对应栏内     …

大二第三周总结(算法+生活)

算法&#xff1a; 题目&#xff1a;有效的括号 这个题目也是做过很多回了。主要就是数据结构中”栈“的应用&#xff0c;先进后出。 解题思路&#xff1a; 1.创建 Map 哈希表形成键值对映射 2.进行遍历字符串 在遍历过程中 如果 遍历到的字符c 是左括号&#xff0c;则入栈 pu…

基于PHP的宠物爱好者交流平台管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

Mysql第三篇---响应太慢?数据库卡顿?如何优化?

Mysql第三篇—响应太慢&#xff1f;数据库卡顿&#xff1f;如何优化&#xff1f; 统计SQL的查询成本&#xff1a;last_query_cost 一条SQL查询语句在执行前需要确定查询执行计划&#xff0c;如果存在多种执行计划的话&#xff0c;MySQL会计算每个执行计划所需要的成本&#x…

CDN+Nginx反向代理来隐藏c2地址

思路&#xff1a;通过借助CDN和Nginx反向代理和HTTPS来隐藏真实c2服务器Nginx反向代理&#xff1a;通过Nginx对外部流量转发到本地&#xff0c;再设置防火墙只允许localhost访问cs端口达到IP白名单的效果 准备 在这个实验环境中&#xff0c;我们需要准备服务器两台(一台服务端…

day40

今日内容概要 针对记录的SQL语句 配置文件的介绍 存储引擎的使用(存储的方式) 数据类型(重点) 整型 浮点型 字符串 日期 枚举 针对记录的sql语句 针对库的增删改查&#xff08;文件夹&#xff09; 1.创建库 create databases db1; # 设置库的默认编码 create databa…