Flink流批一体计算(16):PyFlink DataStream API

目录

概述

Pipeline Dataflow

代码示例WorldCount.py

执行脚本WorldCount.py


概述

Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。

用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。

Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

 FlinkKafkaConsumer是一个Source OperatorMapKeyByTimeWindowApplyTransformation OperatorRollingSink是一个Sink Operator

Pipeline Dataflow

Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask

Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。

紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度。

 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

图中上半部分表示的是将Source和map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,keyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。

图中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。

代码示例WorldCount.py

在本章中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。

编写一个简单的 Python DataStream 作业。

该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。

import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)# define the sourceif input_path is not None:ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")ds = env.from_collection(word_count_data)def split(line):yield from line.split()# compute word countds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkif output_path is not None:ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:print("Printing result to stdout. Use --output to specify output path.")ds.print()# submit for executionenv.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

执行脚本WorldCount.py

python word_count.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58477.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu安装RabbitMQ

一、安装 更新系统软件包列表: sudo apt update安装RabbitMQ的依赖组件和GPG密钥: sudo apt install -y curl gnupg curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmo…

暴力递归转动态规划(二)

上一篇已经简单的介绍了暴力递归如何转动态规划,如果在暴力递归的过程中发现子过程中有重复解的情况,则证明这个暴力递归可以转化成动态规划。 这篇帖子会继续暴力递归转化动态规划的练习,这道题有点难度。 题目 给定一个整型数组arr[]&…

用心维护好电脑,提高学习工作效率

文章目录 一、我的电脑1.1 如何查看自己的电脑硬件信息呢? 二、电脑标准保养步骤和建议2.1 保持清洁2.2 定期升级系统和软件2.3 安全防护2.4 清理磁盘空间2.5 备份重要数据2.6 优化启动项2.7 散热管理2.8 硬件维护2.9 电源管理2.10 注意下载和安装2.11 定期维护 三、…

C++语法基础

这里写目录标题 基础语法第一个程序变量常量的定义关键字标识符命名 (变量命名)sizeof的使用实型(浮点型)字符型转义字符字符串的定义 一级目录二级目录二级目录二级目录 一级目录二级目录二级目录二级目录 基础语法 第一个程序 …

用 PHP 和 JavaScript 显示地球卫星照片

向日葵 8 号气象卫星是日本宇宙航空研究开发机构设计制造的向日葵系列卫星之一,重约 3500 公斤,设计寿命 15 年以上。该卫星于 2014 年 10 月 7 日由 H2A 火箭搭载发射成功,主要用于监测暴雨云团、台风动向以及持续喷发活动的火山等防灾领域。…

hadoop 学习:mapreduce 入门案例一:WordCount 统计一个文本中单词的个数

一 需求 这个案例的需求很简单 现在这里有一个文本wordcount.txt,内容如下 现要求你使用 mapreduce 框架统计每个单词的出现个数 这样一个案例虽然简单但可以让新学习大数据的同学熟悉 mapreduce 框架 二 准备工作 (1)创建一个 maven 工…

Win11共享文件,能发现主机但无法访问,提示找不到网络路径

加密长度选择如下: 参考以下链接: Redirectinghttps://answers.microsoft.com/zh-hans/windows/forum/all/win11%E8%AE%BE%E7%BD%AE%E6%96%87%E4%BB%B6%E5%A4%B9/554343a9-d963-449a-aa59-ce1e6f7c8982?tabAllReplies#tabs

STM32驱动SD卡(SPI)方式

外观 代码(免费分享) 接线 5V供电 CS接PA3 剩下如图按照硬件SPI1接线 注意事项 使用杜邦线接线非常不稳定!!! 使用杜邦线接线非常不稳定!!! 使用杜邦线接线非常不稳定!!&#…

图的存储:十字链表,邻接多重表

1.十字链表存储有向图 1.存储方式 分为顶点结点和弧结点两种结构体 顶点结点使用数组顺序存储,结构体包括:数据域,作为顶点弧头的第一条弧,作为顶点弧尾的第一条弧。 弧结点,结构体包括:弧头相同的下一…

机械臂+2d相机实现复合机器人定位抓取

硬件参数 机械臂:艾利特 相机:海康相机 2d识别库:lindmod,github可以搜到 光源:磐鑫光源 软件参数 系统:windows / Linux 开发平台:Qt 开发语言:C 开发视觉库:OpenCV …

nlp系列(7)三元组识别(Bert+CRF)pytorch

模型介绍 在实体识别中:使用了Bert模型,CRF模型 在关系识别中:使用了Bert模型的输出与实体掩码,进行一系列变化,得到关系 Bert模型介绍可以查看这篇文章:nlp系列(2)文本分类&…

gitlab-runner安装和部署项目

目录 1.安装gitlab-runner 1.1 添加官方仓库 1.2.1 安装最新版本 1.2.2 安装指定版本(可选) 1.2.3 更新runner(可选) 1.3 随便点开gitlab上的一个项目 1.4 gitlab-runner的注册 2.配置gitlab-runner 3.runner一些命令 gi…

通达OAV12版本,表单及流程,定制开发总结

通达OA-V12版本,表单及流程,定制开发总结 触发器金蝶系统对接 日期:2023年8月29日 触发器 一键转交操作,不会调用触发器。 解决办法:可以按需要按步骤,关闭一键转交按钮。这里会隐藏一键转交、一键结束按钮…

立创EDA专业版的原理图上器件有一个虚线框

立创EDA专业版的原理图上器件有一个虚线框解决方法 问题分析: 在使用立创EDA专业版 设计电路原理图时,中途莫名其妙就给我的元件添加了下面图片所示的虚线外框。看着就很别扭的样子,而且工程大了和器件稍微布局比较密的时候就导致整体很难看…

图像分类学习笔记(六)——ResNeXt

一、要点 ResNeXt是ResNet的小幅升级,更新了block 左边(ResNet的block/50/101/152层): 对于输入通道为256的特征矩阵,首先使用64个11的卷积核进行降维,再通过64个33的卷积核处理,再通过256个1…

项目进度与实施计划汇报实践样例模板

一、IT项目实施步骤 项目启动 项目启动 项目启动 项 项目启动 | 需求调研 | 解决方案设计与系统实现 | UAT测试与培训 | 上线与运维支持

nlp大模型课程笔记

自然语言处理基础和应用 👆说明之前的大模型其实有很多都是基于迁移学习的方法。 attention机制的总结,解决了信息瓶颈的问题。 处理词组时BPE的过程 👆pos表示的是token所在的位置 👆技巧是layer normalization。

Nexus2迁移升级到Nexus3

与 Nexus 2.x 相比,Nexus 3.x 为我们提供了更多实用的新特性。SonaType 官方建议我们,使用最新版本 Nexus 2.x 升级到最新版本 Nexus 3.x,并在 Nexus 升级兼容性 一文中为我们提供了各个版本 Nexus 升级到最新版本 Nexus 3.x 的流程&#xff…

Cloudpods 私有云平台有哪些优势?

作为一套完整的私有云管理软件,我们经常会被问到 Cloudpods 和其他的同类产品相比,有哪些优势?我总结了 2 个方面,供大家参考。 功能方面 产品化,开箱即用,易用性较高,基本上都可以傻瓜式的操…