PyODPS DataFrame 处理笛卡尔积的几种方式

PyODPS 提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。

笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 存储目标点经纬度坐标,共有 M 行数据,小表 Coordinates2 存储出发点经纬度坐标,共有 N 行数据,现在需要计算所有离目标点最近的出发点坐标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生 M * N 条数据,也就是一个笛卡尔积问题。

haversine 公式

首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可以使用 haversine 公式,使用 Python 的表达如下:

def  haversine(lat1,  lon1,  lat2,  lon2):#  lat1,  lon1  为位置  1  的经纬度坐标#  lat2,  lon2  为位置  2  的经纬度坐标import  numpy  as  npdlon  =  np.radians(lon2  -  lon1)dlat  =  np.radians(lat2  -  lat1)a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2c  =  2  *  np.arcsin(np.sqrt(a))r  =  6371  #  地球平均半径,单位为公里return  c  *  r

MapJoin

目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只需要两个 dataframe join 时指定 mapjoin=True,执行时会对右表做 mapjoin 操作。

In  [3]:  df1  =  o.get_table('coordinates1').to_df()                                                                                                                                                                                        In  [4]:  df2  =  o.get_table('coordinates2').to_df()                                                                                                                                                                                        In  [5]:  df3  =  df1.join(df2,  mapjoin=True)                                                                                                                                                                                                        In  [6]:  df1.schema                                                                                                                                                                                                                                                      
Out[6]:  
odps.Schema  {latitude                    float64              longitude                  float64              id                                string                
}In  [7]:  df2.schema                                                                                                                                                                                                                                                      
Out[7]:  
odps.Schema  {latitude                    float64              longitude                  float64              id                                string                
}In  [8]:  df3.schema                                                                                                                                                                                                                                                      
Out[8]:  
odps.Schema  {latitude_x                        float64              longitude_x                      float64              id_x                                    string                latitude_y                        float64              longitude_y                      float64              id_y                                    string                
}

可以看到在执行 join 时默认会将重名列加上 _x 和 _y 后缀,可通过在 suffixes 参数中传入一个二元 tuple 来自定义后缀,当有了 join 之后的表后,通过 PyODPS 中 DataFrame 的自建函数就可以计算出距离,十分简洁明了,并且效率很高。

In  [9]:  r  =  6371  ...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()  ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()  ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2  ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r                                                                                                                                                                                              In [12]: df3.head(10)                                                                                                                        
Out[12]: latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis
0   76.252432    59.628253    0   84.045210     6.517522    0  1246.864981
1   76.252432    59.628253    0   59.061796     0.794939    1  2925.953147
2   76.252432    59.628253    0   42.368304    30.119837    2  4020.604942
3   76.252432    59.628253    0   81.290936    51.682749    3   584.779748
4   76.252432    59.628253    0   34.665222   147.167070    4  6213.944942
5   76.252432    59.628253    0   58.058854   165.471565    5  4205.219179
6   76.252432    59.628253    0   79.150677    58.661890    6   323.070785
7   76.252432    59.628253    0   72.622352   123.195778    7  1839.380760
8   76.252432    59.628253    0   80.063614   138.845193    8  1703.782421
9   76.252432    59.628253    0   36.231584    90.774527    9  4717.284949In [13]: df1.count()                                                                                                                         
Out[13]: 2000In [14]: df2.count()                                                                                                                         
Out[14]: 100In [15]: df3.count()                                                                                                                         
Out[15]: 200000

df3 已经是有 M * N 条数据了,接下来如果需要知道最小距离,直接对 df3 调用 groupby 接上 min 聚合函数就可以得到每个目标点的最小距离。


In [16]: df3.groupby('id_x').dis.min().head(10)                                                                                              
Out[16]: dis_min
0   323.070785
1    64.755493
2  1249.283169
3   309.818288
4  1790.484748
5   385.107739
6   498.816157
7   615.987467
8   437.765432
9   272.589621

DataFrame 自定义函数

如果我们需要知道对应最小距离的点的城市,也就是表中对应的 id ,可以在 mapjoin 之后调用 MapReduce,不过我们还有另一种方式是使用 DataFrame 的 apply 方法。要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。

表资源

要注意 apply 是在服务端执行的 UDF,所以不能在函数内使用类似于df=o.get_table('table_name').to_df() 的表达式去获得表数据,具体原理可以参考PyODPS DataFrame 的代码在哪里跑。以本文中的情况为例,要想将表 1 与表 2 中所有的记录计算,那么需要将表 2 作为一个资源表,然后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只需要将一个 collection 传入 resources 参数即可。collection 是个可迭代对象,不是一个 DataFrame 对象,不可以直接调用 DataFrame 的接口,每个迭代值是一个 namedtuple,可以通过字段名或者偏移来取对应的值。

## use dataframe udfdf1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()def func(collections):import pandas as pdcollection = collections[0]ids = []latitudes = []longitudes = []for r in collection:ids.append(r.id)latitudes.append(r.latitude)longitudes.append(r.longitude)df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes})def h(x):        df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)return df.iloc[df['dis'].idxmin()]['id']return hdf1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute(libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在自定义函数中,将表资源通过循环读成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值对应的行,从而得到距离最近的出发点 id。另外,如果在自定义函数中需要使用到三方包(例如本例中的 pandas)可以参考这篇文章。

全局变量

当小表的数据量十分小的时候,我们甚至可以将小表数据作为全局变量在自定义函数中使用。

df1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()
df = df2.to_pandas()def func(x):df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)return df.iloc[df['dis'].idxmin()]['id']df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])

在上传函数的时候,会将函数内使用到的全局变量(上面代码中的 df) pickle 到 UDF 中。但是注意这种方式使用场景很局限,因为 ODPS 的上传的文件资源大小是有限制的,所以数据量太大会导致 UDF 生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在数据量非常小的时候使用。

总结

使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,性能好,一般能用 mapjoin 解决的我们都推荐使用 mapjoin,并且最好使用内建函数计算,能到达最高的效率,但是它不够灵活。另一种是使用 DataFrame 自定义函数,比较灵活,性能相对差一点(可以使用 pandas 或者 numpy 获得性能上的提升),通过使用表资源,将小表作为表资源传入 DataFrame 自定义函数中,从而完成笛卡尔积的操作。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516709.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker JFrog Artifactory 7.27.10 maven私服(仓库配置篇)

文章目录一、二、 建立远程仓库2.1. 仓库列表2.2. 仓库创建演示2.3. 阿里云仓里调整三、建立本地仓库3.1. 仓库列表3.2. 仓库创建演示四、创建虚拟库4.1. 仓库列表4.2. 仓库创建演示五、创建用户/组/权限5.1. 创建用户5.2. 创建用户组5.3. 创建权限5.4. 获取密文密码5.5. 生成配…

ODPS2.0重装上阵,优化提升SQL语言表达能力

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。 MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高…

野鸡大学怎么知道考生电话的?

来源 | 隐小卫责编 | 晋兆雨封图 | CSDN 下载自视觉中国当你在某度频繁搜索“高考”、“志愿”、“大学”、“本科”等关键词时,你的手机号码等信息有可能被非法抓取。并且打包进行交易,卖给培训班、网校、医院等机构进行所谓的“精准获客”。当然&#…

快速入门ECS快照功能,助力大数据容灾保护

阿里云快照是云盘数据在某个时刻完整的只读拷贝,是一种便捷高效的数据容灾手段,常用于数据备份、制作自定义镜像、应用容灾等。 应用场景 推荐您在以下场景中使用快照: 容灾备份:为云盘创建快照,再使用快照创建云盘获…

Docker JFrog Artifactory 7.27.10 maven私服(IDEA 实战篇01) linux

文章目录一、私服配置1. 账户密码2. 本地仓库3. ip/port二、IntelliJ IDEA2.1. 创建项目2.2. 指定配置2.3. 下载依赖2.4. 依赖查看2.5. 注意事项一、私服配置 将服务端生成的配置复制下来&#xff0c;进行修改 1. 账户密码 2. 本地仓库 3. ip/port 内容&#xff1a; <?xml…

一文看懂专有网络和交换机的定义及关系

在专有网络&#xff08;Virtual Private Cloud&#xff0c;简称VPC&#xff09;中使用云资源前&#xff0c;您必须先创建一个专有网络和交换机。您可以在一个专有网络中创建多个交换机来划分子网。一个专有网络内的子网默认私网互通。 专有网络和交换机 专有网络VPC是您独有的…

2020中关村论坛未来青年论坛:聚焦科技与产业数字化转型,让创新成果落地开花

8月26日&#xff0c;由中关村科技园区管理委员会指导&#xff0c;朝阳区人民政府、未来论坛联合主办&#xff0c;中关村朝阳园管委会承办的“2020中关村论坛未来青年论坛”&#xff0c;在北京举行。 作为2020中关村论坛的首场先锋论坛&#xff0c;2020中关村论坛未来青年论坛聚…

JFrog Artifactory 7.27 上传应用到私服和从maven私服下载制品

文章目录一、上传微服务应用1. 生成配置2. 拷贝配置3. 执行上传4. 验证5. 自定义配置二、下载制品2.1. 获取密文密码2.2. 执行下载一、上传微服务应用 1. 生成配置 生成Artifactory仓库上传配置文件&#xff0c;选择仓库&#xff0c;点击‘Set Me Up’查看部署配置 2. 拷…

TS安装和配置

安装• 全局安装TypeScript语言的编译器: npm i -g typescript • 用vscode打开项目文件夹&#xff0c;右键选择在终端中打 开&#xff0c;在终端中输入: tsc -init • 说明: tsc是ts语言的编译器, c是compile的意思&#xff0c; 编译。 结果: • 在当前项目文件夹中生成了tsco…

阿里云2020上云采购季,你适合买什么云产品?

线下IDC机房成本高? 自建数据库卡、慢&#xff0c;延迟高? 被黑客攻击了怎么办&#xff1f; 今年IT预算没多少? 不知道怎么过等保2.0&#xff1f; 你遇到了哪些问题&#xff1f;来阿里云2020上云采购季&#xff01;主会场请戳&#xff1a;https://www.aliyun.com/sale-…

docker 查看实时日志

# 查看实时日志 docker logs -f 容器ID/容器name docker logs -f 0403377b5719 docker logs -f jfrog-oss

从零开始入门 K8s | 理解 CNI 和 CNI 插件

作者 | 溪恒 阿里巴巴高级技术专家 本文整理自《CNCF x Alibaba 云原生技术公开课》第 26 讲&#xff0c;点击直达课程页面。 关注“阿里巴巴云原生”公众号&#xff0c;回复关键词“入门”&#xff0c;即可下载从零入门 K8s 系列文章 PPT。 导读&#xff1a;网络架构是 K8s…

老码农:这段代码绝了,切勿模仿!

作为一名老码农&#xff0c;我的心这次凉透了&#xff01;事情起因很简单&#xff1a;我在某Hub上浏览时候&#xff0c;发现这样的一条信息&#xff1a;Python 超过 C、JS 薪酬排行第一&#xff08;最大招聘网站Indeed.com数据&#xff09;噗&#xff0c;996刚下班的我&#xf…

揭秘 RocketMQ 新特性以及在金融场景下的实践

2019 年末&#xff0c; RocketMQ 正式发布了 4.6.0 版本&#xff0c;增加了“ Request-Reply ”的同步调用的新特性。“ Request-Reply ”这个新特性是由微众银行的开发者们总结实践经验&#xff0c;并反馈给社区的。接下来本文会详细介绍此新特性。 “ Request-Reply ”是什么…

docker 安装部署 Jenkins 2.322

文章目录一、镜像容器1. 安装docker2. 镜像搜索3. 远程拉取镜像4. 创建挂载目录5. 修改权限6. 启动容器二、配置修改2.1. 镜像地址2.2. 核查url2.3. 重启容器2.4. 效果访问2.5. 密码获取2.6. 插件选择2.7. 创建用户2.8. 核查URL一、镜像容器 1. 安装docker yum install docke…

闲鱼的云原生故事:靠什么支撑起万亿的交易规模?

来源 | 阿里巴巴中间件作者 | 王树彬&#xff0c;阿里巴巴闲鱼架构负责人责编 | Carol2014年6月28日&#xff0c;阿里即将赴美上市的这一年&#xff0c;西溪园区的一个茶水间里&#xff0c;28个人日夜赶工了三个月后&#xff0c;上线了一个闲置交易平台——闲鱼。今年5月份&…

VUE_2脚手架

1. 什么是: 一套已经包含核心功能和标准文件夹结构的半成品项目。 2. 为什么: 标准化&#xff01;极其便于协作&#xff0c;降低学习成本。 3. 何时: 今后所有项目&#xff0c;所有新技术&#xff0c;都是在脚手架基础上开发的. 4. 如何: 2大步: (1). 安装可以反复生成脚手架…

Kubernetes operator 模式开发实践

0. 前言 近日我们在开发符合我们业务自身需求的微服务平台时&#xff0c;使用了 Kubernetes 的 Operator Pattern 来实现其中的运维系统&#xff0c;在本文&#xff0c;我们将实现过程中积累的主要知识点和技术细节做了一个整理。 读者在阅读完本文之后&#xff0c;会对 Oper…

Jenkins 2.322 安装 自定义插件

文章目录Jenkins自定义安装插件1. Rebuilder2. Safe Restart3. Artifactory4. Build Timeout5. Credentials Binding6. Email Extension7. Git8. Pipeline9. SonarQube Scanner10. SSH Build Agents11. Workspace Cleanup12. TimestamperJenkins自定义安装插件 在jenkins管理页…

从国际站 - M 站建设谈开发者产品思维

前言 作为一个开发者我们在持续不断地交付我们负责的需求&#xff0c;可我们很少从产品的角度来看待我们交付的需求&#xff0c;比方说一直被强调的需求类型、需求价值、需求目标。作为产品需要具备的能力&#xff1a;看到用户、倾听用户、判断用户、与用户连接、在用户的交互…