Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心


import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三菱CNC数采超详细,资料全备教程,后续更新发那科数采教程

三菱数采详细教程 文章目录 三菱数采详细教程一、介绍1.背景2.需要掌握知识3.需要资料①三菱SDK包:A2②三菱com接口文档③C#代码:④VStudio⑤资料存放网盘 二、程序运行1.调试设备①条件②命令 2.运行软件①打开软件②运行程序 三、数据采集1.代码了解2.…

常见限流算法详细解析

常见限流算法详细解析 分布式系统中,由于接口API无法控制上游调用方的行为,因此当瞬时请求量突增时,会导致服务器占用过多资源,发生响应速度降低、超时、乃至宕机,甚至引发雪崩造成整个系统不可用。 限流,…

java+ssm+mysql高校学籍管理系统

项目介绍: 使用javassmmysql开发的高校学籍管理系统,系统包含超级管理员,系统管理员、教师、学生角色,功能如下: 超级管理员:管理员管理(可以新增管理员);专业管理&…

(5)JS-Clipper2之PolyNode

1. 描述 PolyNodes是被封装在PolyTree的容器中,同时提供了一个数据结构来代表由Excute()方法返回的多边形轮廓中的父子关系。 一个PolyNode对象代表一个多边形;它的“IsHole”属性表明它是一个“外轮廓”还是一个“内孔”,PolyNodes可能包含…

Java项目实战II基于微信小程序的无中介租房系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 随着城市化进程的加速,租房市场日益繁荣&a…

MATLAB稀疏感知图像和体数据恢复的系统对象研究

稀疏感知图像和体数据恢复是一种用于恢复损坏、噪声或不完整的图像和体数据的技术。它利用了信号的稀疏性,即信号在某种基础下可以用较少的非零系数表示,从而实现高质量的恢复。 在进行稀疏感知图像和体数据恢复的研究时,需要定义一些系统对…

安卓调试环境搭建

前言 前段时间电脑重装了系统,最近准备调试一个apk,没想到装环境的过程并不顺利,很让人火大,于是记录一下。 反编译工具下载 下载apktool.bat和apktool.jar 官网地址:https://ibotpeaches.github.io/Apktool/install…

【工具】音频文件格式转换工具

找开源资源、下载测试不同库的效果,然后找音频、下载音频、编写代码、测试转换、流程通畅。写一个工具花的时间越来越多了!这个 5 天 这个工具是一个音频文件格式转换工具,支持对 mp3.aac.wav.caf.flac.ircam.mp2.mpeg.oga.opus.pcm.ra.spx.…

在ARM Linux应用层下使用SPI驱动WS2812

文章目录 1、前言2、结果展示3、接线4、SPI驱动WS2812原理4.1、0码要发送的字节4.2、1码要发送的字节4.3、SPI时钟频率 5、点亮RGB5.1、亮绿灯5.2、亮红灯5.3、亮蓝灯5.4、完整程序 6、RGB呼吸灯7、总结 1、前言 事情是这样的,前段时间,写了一个基于RK3…

BERT:用于语言理解的深度双向 Transformer 的预训练。

文章目录 0. 摘要1. 介绍2. 相关工作2.1 无监督的基于特征的方法2.3 无监督微调方法2.3 从受监督数据中迁移学习 3. BERT3.1 预训练 BERT3.2 微调 BERT 4. 实验4.1 GLUE4.2 SQuAD v1.14.3 SQuAD v2.04.4 SWAG 5. 消融研究5.1 预训练任务的影响5.2 模型大小的影响5.3 使用 BERT …

在算网云平台云端在线部署stable diffusion (0基础小白超详细教程)

Stable Diffusion无疑是AIGC领域中的AI绘画利器,具有以下显著优势: 1、开源性质,支持本地部署 2、能够实现对图像生成过程的精确控制 虽然SD在使用上有很多的有点,但缺点也是不言而喻的,由于AI绘画的整个过程以及现…

设计模式——Chain(责任链)设计模式

摘要 责任链设计模式是一种行为设计模式,通过链式调用将请求逐一传递给一系列处理器,直到某个处理器处理了请求或所有处理器都未能处理。它解耦了请求的发送者和接收者,允许动态地将请求处理职责分配给多个对象,支持请求的灵活传…

macOS 15.1.1 (24B2091) 系统中快捷键符号及其代表的按键的对照表

以下是 macOS 15.1.1 (24B2091) 系统中快捷键符号及其代表的按键的对照表: 符号按键名称描述⌘Command (Cmd)常用的功能键,用于执行大多数快捷操作。⌥Option (Alt)Option 键,常用于辅助操作和特殊字符输入。⇧ShiftShift 键,常用…

el-table一键选择全部行,切换分页后无法勾选

el-table一键全选,分页的完美支持 问题背景尝试解决存在问题问题分析 解决方案改进思路如下具体代码实现如下 问题背景 现在有个需求,一个表格有若干条数据(假设数量大于20,每页10条,保证有2个以上分页即可)。 现在需要在表格上方…

【55 Pandas+Pyecharts | 实习僧网Python岗位招聘数据分析可视化】

文章目录 🏳️‍🌈 1. 导入模块🏳️‍🌈 2. Pandas数据处理2.1 读取数据2.2 查看数据信息2.3 去除重复数据2.4 调整部分城市名称 🏳️‍🌈 3. Pyecharts数据可视化3.1 招聘数量前20岗位3.2 各城市招聘数量3…

【赵渝强老师】PostgreSQL的控制文件

PostgreSQL数据库的物理存储结构主要是指硬盘上存储的文件,包括:数据文件、日志文件、参数文件、控制文件、WAL预写日志文件等等。 下面重点讨论一下PostgreSQL的控制文件。 视频讲解如下 【赵渝强老师】PostgreSQL的控制文件 控制文件记录了数据库运行…

在做题中学习(79):最小K个数

解法:快速选择算法 说明:堆排序也是经典解决问题的算法,但时间复杂度为:O(NlogK),K为k个元素 而将要介绍的快速选择算法的时间复杂度为: O(N) 先看我的前两篇文章,分别学习:数组分三块&#…

连续大涨,汉王科技跑步进入AI应用舒适区

OpenAI正在进行的“12天12场直播”让行业再次沸腾,二级市场也在寻找AI应用的机会。这刺激了12月首周同花顺sora概念涨超11%,远超同期大盘指数涨幅。 截至目前,“满血版”推理模型o1和月收费高达200美元的ChatGPT Pro订阅服务&…

[MySQL基础](三)SQL--图形化界面+DML

本专栏内容为:MySQL学习专栏 💓博主csdn个人主页:小小unicorn ⏩专栏分类:MySql 🚚代码仓库:小小unicorn的代码仓库🚚 🌹🌹🌹关注我带你学习编程知识 目录 图…

基于单片机的智能灯光控制系统

摘要 现在的大部分的大学,都是采用了一种“绿色”的教学方式,再加上现在的大学生缺乏环保意识,所以在学校里很多的教室,在白天的时候灯都会打开,这是一种极大的浪费,而且随时都有可能看到,这是…