流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换

pyflink 处理 kafka数据
在这里插入图片描述

1 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSinkdef tutorial():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)ds = env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')],type_info=Types.ROW([Types.INT(), Types.STRING()]))ds.add_sink(StreamingFileSink.for_row_format('output', Encoder.simple_string_encoder()).build())env.execute("tutorial_job")if __name__ == '__main__':tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')],type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink.for_row_format('output', Encoder.simple_string_encoder()).build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

2 自定义转换函数的三种方式

三种方式支持用户自定义函数。

2.1 Lambda函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())mapped_stream.print()
env.execute("tutorial_job")

2.2 python函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentdef my_map_func(value):return value + 1def main():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())mapped_stream.print()env.execute("tutorial_job")if __name__ == '__main__':main()

2.3 接口函数[复杂]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunctionclass MyMapFunction(MapFunction):def map(self, value):return value + 1def main():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())mapped_stream.print()env.execute("tutorial_job")if __name__ == '__main__':main()

3 常用算子

参考官网算子

3.1 map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())mapped_stream.print()
env.execute("tutorial_job")

在这里插入图片描述

3.2 flat_map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
out = data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.3 filter【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件def my_func(value):if value % 2 == 0:return valuedata_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
filtered_stream = data_stream.filter(my_func)filtered_stream.print()
env.execute("tutorial_job")

3.4 window_all【DataStream->AllWindowedStream】

根据某些特征(例如,最近 100毫秒秒内到达的数据)对所有流事件进行分组。
所有的元素。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.4.1 apply【AllWindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterablefrom pyflink.common import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AllWindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindowenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:sum = 0for input in inputs:sum += input[0]yield sumdata_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
out = all_window_stream.apply(MyAllWindowFunction())out.print()
env.execute("tutorial_job")

3.5 key_by【DataStream->KeyedStream】

需要结合reduce或window算子使用。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

3.6 reduce【KeyedStream->DataStream】增量

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
out = key_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))out.print()
env.execute("tutorial_job")

在这里插入图片描述
在相同 key 的数据流上“滚动”执行 reduce。
将当前元素与最后一次 reduce 得到的值组合然后输出新值。

3.7 window【KeyedStream->WindowedStream】

在已经分区的 KeyedStreams 上定义 Window。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.7.1 apply【WindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterablefrom pyflink.common import Time, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindowenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:sum = 0for input in inputs:sum += input[0]yield key, sumdata_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.apply(MyWindowFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.7.2 reduce【WindowedStream->DataStream】

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows,TumblingProcessingTimeWindowsenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))out.print()
env.execute("tutorial_job")

在这里插入图片描述
方式二

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction
from pyflink.datastream.window import TumblingProcessingTimeWindowsenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyReduceFunction(ReduceFunction):def reduce(self, value1, value2):return value1[0] + value2[0], value1[1]data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(MyReduceFunction())out.print()
env.execute("tutorial_job")

3.8 union【DataStream*->DataStream】

将两个或多个数据流联合来创建一个包含所有流中数据的新流。
注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
out = data_stream2.union(data_stream1)out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9 connect【DataStream,DataStream->ConnectedStream】

stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

3.9.1 CoMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoMapFunctionenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)class MyCoMapFunction(CoMapFunction):def map1(self, value):return value[0] *100, value[1]def map2(self, value):return value[0], value[1] + 'flink'out = connected_stream.map(MyCoMapFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9.2 CoFlatMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoFlatMapFunctionenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)class MyCoFlatMapFunction(CoFlatMapFunction):def flat_map1(self, value):for i in range(value[0]):yield value[0]*100def flat_map2(self, value):yield value[0] + 10out = connected_stream.flat_map(MyCoFlatMapFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

4 对接kafka输入json输出json

输入{“name”:“中文”}
输出{“name”:“中文结果”}

from pyflink.common import SimpleStringSchema, WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \KafkaRecordSerializationSchema
import jsonenv = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)brokers = "IP:9092"# 读取kafka
source = KafkaSource.builder() \.set_bootstrap_servers(brokers) \.set_topics("flink_source") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.latest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()ds1 = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds1.print()# 处理流程
def process_fun(line):data_dict = json.loads(line)result_dict = {"result": data_dict.get("name", "无")+"结果"}return json.dumps(result_dict, ensure_ascii=False)ds2 = ds1.map(process_fun, Types.STRING())
ds2.print()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/849503.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[网鼎杯 2020 青龙组]jocker

运行程序,发现是要我们自己输入 那么肯定是拿到enc慢慢还原 32位,无壳 进来就红一下报错 这里可以看见长度为24 动调一下看看 这里进行了大量的异或 这里是对地址开始的硬编码进行异或,从而达到smc的效果 所以你也可以发现在进行这一步操作之前 encry函数全是报错 你点开…

黄金猛涨周大福却狂跌600亿搬厂裁员,年轻人血脉觉醒?

今年以来,有两样东西一直被吃瓜群众们津津乐道,一是AI的进化速度,二是黄金涨价的速度,并且时常霸占社交媒体热搜。‍‍‍‍‍‍‍‍‍ 尤其是黄金市场,更是一路上涨,快窜出天际了,不少吃瓜群众…

[CLIP] Learning Transferable Visual Models From Natural Language Supervision

通过在4亿图像/文本对上训练文字和图片的匹配关系来预训练网络,可以学习到SOTA的图像特征。预训练模型可以用于下游任务的零样本学习 ​​​​​​​ ​​​​​​​ 1、网络结构 1)simplified version of ConVIRT 2)linear …

麒麟v10系统arm64架构openssh9.7p1的rpm包

制作openssh 说明 理论上制作的多个rpm在arm64架构(aarch64)都适用 系统信息:4.19.90-17.ky10.aarch64 GNU/Linux 升级前备份好文件/etc/ssh、/etc/pam.d等以及开启telnet 升级后确认正常后关闭telnet 在之前制作过openssh-9.5p1基础上继续…

1.Linux入门

文章目录 一、介绍1.1 操作系统1.2 Linux1.3 虚拟机1.4 安装 CentOS7 二、远程连接 Linux2.1 FinalShell2.2 远程连接Linux 三、扩展3.1 WSL3.2 虚拟机快照 一、介绍 1.1 操作系统 我们平常所用的电脑是个人桌面操作系统,也就是Windows或者是macOS 目前我们要学的…

【YOLOv10改进[CONV]】使用DualConv二次创新C2f模块实现轻量化 + 含全部代码和详细修改方式 + 手撕结构图 + 全网首发

本文将使用DualConv二次创新C2f模块实现轻量化,助力YOLOv10目标检测效果的实践,文中含全部代码、详细修改方式以及手撕结构图。助您轻松理解改进的方法。 改进前和改进后的参数对比: 目录 一 DualConv 1 结合33卷积和11卷积核 2 DualConv 3 可视化 二 C2f_DualConv助…

el-dialog给弹框标题后加图标,鼠标悬停显示详细内容

效果&#xff1a; 代码&#xff1a; <div slot"title" class"el-dialog__title">标题<el-tooltip effect"dark" placement"right"><div slot"content">鼠标悬停显示</div><i class"el-icon…

水务设备数字化管理

在数字化浪潮席卷全球的今天&#xff0c;水务行业也迎来了数字化转型的重要契机。传统水务管理模式中&#xff0c;设备监控、数据收集、运行维护等环节往往存在效率低下、成本高昂、安全隐患多等问题。而HiWoo Cloud平台的出现&#xff0c;以其强大的设备接入能力、高效的数据处…

外贸自动化脚本编写会用到的源代码!

随着全球化的加速推进&#xff0c;外贸行业正迎来前所未有的发展机遇&#xff0c;为了提高工作效率、减少人为错误&#xff0c;并更好地把握市场机遇&#xff0c;越来越多的外贸企业开始关注自动化脚本的编写与应用。 自动化脚本不仅可以帮助企业实现业务流程的自动化&#xf…

派单软件,改变服务业未来的神秘武器!

随着人们生活质量的提升&#xff0c;对于日常生活、工作中的售后维修服务响应时间、服务质量十分的在意。即使现在信息化时代快速发展&#xff0c;但还是有不少人们面临着以下问题。 你是否曾经因为等待维修服务而焦急万分&#xff1f; 你是否曾经因为繁琐的报修流程而倍感烦恼…

苍穹外卖笔记-08-套餐管理-增加,删除,修改,查询和起售停售套餐(上)

套餐管理 1 任务2 新增套餐2.1 需求分析和设计页面原型和业务规则接口设计setmeal和setmeal_dish表设计 2.2 代码开发2.2.1 根据分类id查询菜品DishControllerDishServiceDishServiceImplDishMapperDishMapper.xml 2.2.2 新增套餐接口SetmealControllerSetmealServiceSetmealSe…

【自定义View】Android圆饼进度条

源码 自定义属性 <?xml version"1.0" encoding"utf-8"?> <resources><declare-styleable name"ArcProgressView"><attr name"android:textSize" /><attr name"bgBorderWidth" format"d…

计算机毕业设计基于YOLOv8的头盔检测系统

1、安装Anaconda 官网下载或者哔哩哔哩有的up分享 https://www.anaconda.com/download 版本无所谓&#xff0c;安装位置不要有中文就行 2、创建环境yolov8 winR打开命令行 conda create -n yolov8 python3.9 3、打开源码 下载下来放到你想放的目录&#xff0c;直接用pyCharm或者…

【香橙派】Orange Pi AIpro体验——国产AI赋能

文章目录 &#x1f354;开箱&#x1f6f8;烧录镜像⭐启动系统&#x1f388;本机登录&#x1f388;远程登陆 &#x1f386;AI功能体验&#x1f50e;总结 &#x1f354;开箱 可以看到是很精美的开发组件 这里是香橙派官网 http://www.orangepi.cn/ 我们找到下面图片的内容&#…

“冻干”凭什么好吃不肥喵?既能当零食又可做主食的冻干分享

近年来&#xff0c;冻干猫粮因其高品质而备受喜爱&#xff0c;吸引了无数猫主人的目光&#xff0c;像我这样的资深养猫人早已开始选择冻干喂养。但新手养猫的人&#xff0c;可能会感到迷茫&#xff1a;冻干猫粮到底是什么&#xff1f;冻干可以一直当主食喂吗&#xff1f; 一、…

算法003:快乐数

这道题采用快慢双指针的方法。 为了弄清楚这个题到底是要我们干嘛&#xff0c;我们把整个过程类比一下&#xff1a; 不管是n19还是n2&#xff0c;我们都把它当成一种判断链表是否有环的方式。 对于n19&#xff0c;题干是这样解释的&#xff1a; 我们把它当成链表&#xff0c…

【STL源码剖析】priority_queue 优先队列的简单实现

水到绝处是风景 人到绝境是重生 目录 priority_queue的模拟实现 源码剖析&#xff1a; 代码测试&#xff1a; 契子✨ 我们之前不仅讲过 队列queue 还有 双端队列deque 而我们今天所讲的依旧是队列家族的成员 -- 优先队列priority_queue 顾名思义&#xff0c;priority_queue是…

空间搜索geohash概述

概述 通常在一些2C业务场景中会根据用户的位置来搜索一些内容。通常提供位置搜索的都是直接通过redis/mongodb/es等中间件实现的。 但是这些中间件又是怎么实现位置搜索的呢&#xff1b; 查了一番资料&#xff0c;发现背后一个公共的算法Geohash。 Geohash 经度和纬度是2个…

Amesim示例篇-案例2:液体循环回路

前文已完成流体库常用的元件参数与使用方法简单的介绍。本文将对液体回路系统管路的压降标定仿真方法与注意事项进行讨论。首先&#xff0c;本案例应用到的元件有膨胀水壶、水泵、阻力管、常规管路等元件。将上述元件进行串联组成液冷循环回路。 图1 膨胀水壶 图2 水泵 1…

如何让tracert命令的显示信息显示*星号

tracert命令如果在中间某一个节点超时&#xff0c;只会在显示信息中标识此节点信息超时“ * * * ”&#xff0c;不影响整个tracert命令操作。 如上图所示&#xff0c;在DeviceA上执行tracert 10.1.2.2命令&#xff0c;缺省情况下&#xff0c;DeviceA上的显示信息为&#xff1a;…