0基础学习PyFlink——使用DataStream进行字数统计

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):for s in line.split():yield ssplitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/125673.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# Onnx 用于边缘检测的轻量级密集卷积神经网络LDC

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms;namespace Onnx…

【HTML】HTML基础知识扫盲

1、什么是HTML? HTML是超文本标记语言(Hyper Text Markup Language)是用来描述网页的一种语言 注意: HTML不是编程语言,而是标记语言 HTML文件也可以直接称为网页,浏览器的作用就是读取HTML文件&#xff…

【网络协议】聊聊http协议

当我们输入www.baidu.com的时候,其实是先将baidu.com的域名进行DNS解析,转换成对应的ip地址,然后开始进行基于TCP构建三次握手的连接,目前使用的是1.1 默认是开启了keep-Alive。可以在多次请求中进行连接复用。 HTTP 请求的构建…

Bayes决策:身高与体重特征进行性别分类

代码与文件请从这里下载:Auorui/Pattern-recognition-programming: 模式识别编程 (github.com) 简述 分别依照身高、体重数据作为特征,在正态分布假设下利用最大似然法估计分布密度参数,建立最小错误率Bayes分类器,写出得到的决…

控梦术(一)之什么是清明梦

控梦术 首先,问大家一个问题。在梦中,你知道自己是在做梦吗?科学数据表明,大约23%的人在过去一个月中,至少有一次在梦中意识到自己正在做梦。科学家把这叫做清醒梦或者叫做清明梦。科学家说,每个人都能学会…

springboot的缓存和redis缓存,入门级别教程

一、springboot(如果没有配置)默认使用的是jvm缓存 1、Spring框架支持向应用程序透明地添加缓存。抽象的核心是将缓存应用于方法,从而根据缓存中可用的信息减少执行次数。缓存逻辑是透明地应用的,对调用者没有任何干扰。只要使用…

云计算与ai人工智能对高防cdn的发展

高防CDN(Content Delivery Network)作为网络安全领域的一项关键技术,致力于保护在线内容免受各种网络攻击,包括分布式拒绝服务攻击(DDoS)等。然而,随着人工智能(AI)和大数…

C#__委托delegate

委托存储的是函数的引用(把某个函数赋值给一个委托类型的变量,这样的话这个变量就可以当成这个函数来进行使用了) 委托类型跟整型类型、浮点型类型一样,也是一种类型,是一种存储函数引用的类型 using System.Reflec…

Linux网络基础2 -- 应用层相关

一、协议 引例:编写一个网络版的计算器 1.1 约定方案:“序列化” 和 “反序列化” 方案一:客户端发送形如“11”的字符串,再去解析其中的数字和计算字符,并且设限(如数字和运算符之间没有空格; 运算符只…

RIS辅助MIMO广播信道容量

RIS辅助MIMO广播信道容量 摘要RIS辅助的BC容量矩阵形式的泰勒展开学习舒尔补 RIS-Aided Multiple-Input Multiple-Output Broadcast Channel Capacity论文阅读记录 基于泰勒展开求解了上行容量和最差用户的可达速率,学习其中的展开方法。 摘要 Scalable algorithm…

什么是神经网络,它的原理是啥?(1)

参考:https://www.youtube.com/watch?vmlk0rddP3L4&listPLuhqtP7jdD8CftMk831qdE8BlIteSaNzD 视频1: 简单介绍神经网络的基本概念,以及一个训练好的神经网络是怎么使用的 分类算法中,神经网络在训练过程中会学习输入的 pat…

Pmdarima实现单变量时序预测与交叉验证

目录 1. pmdarima实现单变量时间序列预测 2. 时间序列交叉验证 2.1 滚动交叉验证(RollingForecastCV) 2.2 滑窗交叉验证(SildingWindowForecastCV) 1. pmdarima实现单变量时间序列预测 Pmdarima是以statsmodel和autoarima为基础、封装研发出的Python时序分析库、也是现在市…

故障诊断模型 | Maltab实现GRU门控循环单元故障诊断

文章目录 效果一览文章概述模型描述源码设计参考资料效果一览 文章概述 故障诊断模型 | Maltab实现GRU门控循环单元故障诊断 模型描述 利用各种检查和测试方法,发现系统和设备是否存在故障的过程是故障检测;而进一步确定故障所在大致部位的过程是故障定位。故障检测和故障定位…

3ds Max2022安装教程(最新最详细)

目录 一.简介 二.安装步骤 网盘资源见文末 一.简介 3DS Max是由Autodesk公司开发的一款专业三维建模、动画和渲染软件,广泛应用于影视、游戏、建筑和工业设计等领域。 3DS Max的主要特点和功能包括: 三维建模:3DS Max提供了各种强大的建…

如何用思维导图开会

在办公室和会议室使用思维导图会有无数好处。今天我们就聊聊思维导图在开会中的作用? 为什么要在会议中使用思维导图? 思维导图可以帮助我们整理思路。会议通常涉及到复杂的议题和讨论,使用思维导图可以帮助整合和梳理参与者的思路和观点。通…

Vue项目搭建及使用vue-cli创建项目、创建登录页面、与后台进行交互,以及安装和使用axios、qs和vue-axios

目录 1. 搭建项目 1.1 使用vue-cli创建项目 1.2 通过npm安装element-ui 1.3 导入组件 2 创建登录页面 2.1 创建登录组件 2.2 引入css(css.txt) 2.3 配置路由 2.5 运行效果 3. 后台交互 3.1 引入axios 3.2 axios/qs/vue-axios安装与使用 3.2…

priority_queue 的模拟实现

priority_queue 的底层结构 我们已经学习过栈和队列了,他们都是用一种容器适配出来的。今天我们要学习的 prority_queue 也是一个容器适配器。在 priority_queue 的使用部分我们已经知道想要适配出 priority_queue,这个底层的容器必须有以下接口&#x…

安装Python环境

Python 安装包下载地址:https://www.python.org/downloads/ 打开该链接,可以看到有两个版本的 Python,分别是 Python 3.x 和 Python 2.x,如下图所示: Python下载页面截图 图 1 Python 下载页面截图(包含…

基于单片机设计的防煤气泄漏装置

一、前言 煤气泄漏是一个严重的安全隐患,可能导致火灾、爆炸以及对人体健康的威胁。为了提高家庭和工业环境中煤气泄漏的检测和预防能力,设计了一种基于单片机的防煤气泄漏装置。 单片机选择STC89C52作为主控芯片。为了检测煤气泄漏,采用了…

cocosCreator 调用wxAPI 及后台授权设置、获取用户昵称和头像

版本: 3.8.0 语言: TypeScript 环境: Mac 官方文档: 微信官方文档 - 开放能力 微信 API 小游戏环境 在cocosCreator的3.x版本项目开发中,TypeScript最终会被转换为JavaScript语言。 JavaScript的运行时调用的API…