图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理

image

一、场景案例

在一张社区网络里,可能需要查询出各个顶点邻接关联的顶点集合,类似查询某个人关系比较近的都有哪些人的场景。

在用Spark graphx中,通过函数collectNeighbors便可以获取到源顶点邻接顶点的数据。

下面以一个例子来说明,首先,先基于顶点集和边来创建一个Graph图。

image

该图的顶点集合为——

(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David"),
(5L, "Eve"),
(6L, "Frank"),
(7L, "Grace"),
(8L, "Henry"),
(9L, "Ivy")

边的集合为——

Edge(1L, 2L, "friend"),
Edge(1L, 5L, "friend"),
Edge(2L, 3L, "friend"),
Edge(2L, 4L, "friend"),
Edge(3L, 4L, "friend"),
Edge(4L, 6L, "friend"),
Edge(5L, 7L, "friend"),
Edge(5L, 8L, "friend"),
Edge(6L, 9L, "friend"),
Edge(7L, 8L, "friend"),
Edge(8L, 9L, "friend")

基于以上顶点和边,分别建立一个顶点RDD 和边RDD,然后通过Graph(vertices, edges, defaultVertex)创建一个Graph图,代码如下——

val conf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val ss = SparkSession.builder().config(conf).getOrCreate()// 创建顶点RDD
val vertices = ss.sparkContext.parallelize(Seq((1L, "Alice"),(2L, "Bob"),(3L, "Charlie"),(4L, "David"),(5L, "Eve"),(6L, "Frank"),(7L, "Grace"),(8L, "Henry"),(9L, "Ivy")
))// 创建边RDD
val edges = ss.sparkContext.parallelize(Seq(Edge(1L, 2L, "friend"),Edge(1L, 5L, "friend"),Edge(2L, 3L, "friend"),Edge(2L, 4L, "friend"),Edge(3L, 4L, "friend"),Edge(4L, 6L, "friend"),Edge(5L, 7L, "friend"),Edge(5L, 8L, "friend"),Edge(6L, 9L, "friend"),Edge(7L, 8L, "friend"),Edge(8L, 9L, "friend")
))val graph = Graph(vertices, edges, null)

在成功创建图之后,就可以基于已有的图,通过collectNeighbors方法,分别得到每个顶点关联邻接顶点的数据——

val neighborVertexs = graph.mapVertices{case (id,(label)) => (label)
}.collectNeighbors(EdgeDirection.Either)

最终得到的neighborVertexs是一个VertexRDD[Array[(VertexId, VD)]]类型的RDD,可以通过neighborVertexs.foreach(println)打印观察一下,发现数据里,是每一个【顶点,元组】的结构,注意看,大概就能猜出来,通过neighborVertexs得到的RDD其实就是每个顶点关联了邻接顶点集合元组的数据——

(5,[Lscala.Tuple2;@bb793d7)
(8,[Lscala.Tuple2;@6d5786e6)
(1,[Lscala.Tuple2;@398cb9ea)
(9,[Lscala.Tuple2;@61c4eeb2)
(2,[Lscala.Tuple2;@d7d0256)
(6,[Lscala.Tuple2;@538f0156)
(7,[Lscala.Tuple2;@77a17e3d)
(3,[Lscala.Tuple2;@1be2a4fb)
(4,[Lscala.Tuple2;@1e0153f9)

可以进一步验证,将元组里的数据进行展开打印,通过以下代码进行验证——先通过coalesce(1)将分区设置为一个分区,多个分区打印难以确定打印顺序。然后再通过foreach遍历RDD里每一个元素,这里的元素结构如(5,[Lscala.Tuple2;@bb793d7),x._1表示是顶点5,x._2表示[Lscala.Tuple2;@bb793d7,既然是元组,那就可以进一步进行遍历打印,即 x._2.foreach(y => {...})——

neighborVertexs.coalesce(1).foreach(x => {print("顶点:" + x._1 + "关联的邻居顶点集合->{" )var str = "";x._2.foreach(y => {str += y + ","})print(str.substring(0, str.length - 1 ) +"}")println()
})

可以观察一下最后打印结果——

顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)}
顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)}
顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)}
顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)}
顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)}
顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}
顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)}
顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)}
顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)}

结合文章开始的那一个图验证一下,顶点1关联的邻接顶点是(2,Bob),(5,Eve),正确;顶点8关联的邻接顶点是(5,Eve),(7,Grace),(9,Ivy),正确。其他验证都与下图情况符合。可见,通过collectNeighbors(EdgeDirection.Either)确实可以获取网络里每个顶点关联邻接顶点的数据。

image

二、函数代码原理解析

以上就是顶点关联邻接顶点的用法案例,接下来,让我们分析一下collectNeighbors(EdgeDirection.Either)源码,该函数实现了收集顶点邻居顶点的信息——

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {val nbrs = edgeDirection match {//聚合本顶点出度指向的邻居顶点和入度指向本顶点的邻居顶点case EdgeDirection.Either =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => {ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))},(a, b) => a ++ b, TripletFields.All)//聚合本顶点出度指向的邻居顶点case EdgeDirection.In =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),(a, b) => a ++ b, TripletFields.Src)//聚合入度指向本顶点的邻居顶点case EdgeDirection.Out =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),(a, b) => a ++ b, TripletFields.Dst)case EdgeDirection.Both =>throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +"EdgeDirection.Either instead.")}graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])}
} // end of collectNeighbor

该函数用match做了一个类似Java的switch匹配,匹配有四种结果,其中,最后一种EdgeDirection.Both已经不支持,故而这里就不解读了,只讲仍然有用的三种。

用一个图来说明吧,假如有以下边指向的图——

Edge(2L, 1L),
Edge(2L, 4L),
Edge(3L, 2L),
Edge(2L, 5L),

image

  • EdgeDirection.Either表示本顶点的出度邻居和入度邻居。若本顶点为2,那么它得到邻居顶点包括(1,4,3,5),该参数表示只要与顶点2一度边关联的,都会聚集成邻居顶点。

  • EdgeDirection.In表示指向本顶点的邻居,即本顶点的入度邻居。若本顶点为2,图里邻居顶点只有3是指向2的,那么顶点2得到邻居顶点包括(3)。

  • EdgeDirection.Out表示本顶点的出度指向的邻居顶点。若本顶点为2,图里从顶点2指向邻居顶点的,将得到(1,4,5)。

由此可知,顶点关联邻居顶点的函数collectNeighbors(EdgeDirection.Either)里面的参数,就是可以基于该参数得到不同情况的邻居顶点。

这里以collectNeighbors(EdgeDirection.Either)说明函数核心逻辑——

 graph.aggregateMessages[Array[(VertexId, VD)]](ctx => {ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))},(a, b) => a ++ b, TripletFields.All)

该代码做了聚合,表示会对图里的所有边做处理。

图里有一种边结构,叫三元组(Triplet),这种结构由以下三个部分组成——

  1. 源顶点(Source Vertex):图中的一条边的起始点或源节点。
  2. 目标顶点(Destination Vertex):图中的一条边的结束点或目标节点。
  3. 边属性(Edge Attribute):连接源顶点和目标顶点之间的边上的属性值。

在graph.aggregateMessages[Array[(VertexId, VD)]]( ctx => {......})聚合函数里,就是基于三元组去做聚合统计的。

该聚合函数有两个参数,第一个参数是一个函数(ctx) => { ... },里面定义了每个顶点如何发送消息给邻居顶点。

注意看,这里的ctx正是一个三元组对象,基于该对象,可以获取一下信息——

  • ctx.srcId:获取源顶点的ID。

  • ctx.srcAttr:获取源顶点的属性。

  • ctx.dstId:获取目标顶点的ID。

  • ctx.dstAttr:获取目标顶点的属性。

ctx作为一个知道源顶点、目标顶点的三元组对象,就像一个邮差一样,负责给两边顶点发送消息。

1、ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))函数,这里顶点A是作为目标顶点,邻居节点B是源顶点,ctx对象就会将目标顶点B的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点A,A会将收到的消息保存下来,这样就知道EdgeDirection.Either无向边情况下,它有一个邻居B了。

image

2、 ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))函数,这时A成为了源顶点,C成为了目标顶点,ctx对象就会将源顶点A的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点B。B会将收到的消息以数组格式Array((ctx.dstId, ctx.dstAttr))保存下来,这样B以后就知道EdgeDirection.Either无向边情况下,它有一个邻居A了。

image

这里ctx.sendToDst()用Array((ctx.dstId, ctx.dstAttr))数组形式发送,是方便后面的(a, b) => a ++ b 合并函数操作,最后每个顶点可以将它收到的邻居顶点数组合并到一个大的数组,即所有邻居顶点聚集到一个数组里返回。

还有一个TripletFields枚举需要了解下——

TripletFields.All表示本顶点将聚合包括源顶点以及目标顶点发送顶点消息。

TripletFields.Src表示本顶点只聚合源顶点发送过来的顶点消息。

TripletFields.Dst表示本顶点只聚合目标顶点发送过来的顶点消息。

EdgeDirection.Either参数对应的是TripletFields.All,表示需要将本顶点接收到的所有源顶点以及目标顶点发送的顶点消息进行聚合。

接下来,就是做聚合了——

整个图里会有许多类似邮差角色的ctx对象,只需要处理完这些对象,那么,每个顶点就会收到通过ctx对象传送过来的邻居顶点信息。

例如,A收到的ctx对象发过来的邻居消息如下——

Array((B,属性))

Array((C,属性))

Array((D,属性))

......

这时,就可以基于顶点A作为分组key,将组内的Array((B,属性))、Array((C,属性))、Array((D,属性))都合并到一个组里,即通过(a, b) => a ++ b将分组各个数据合并成一个大数组{(B,属性),(C,属性),(D,属性)},这个分组group的key是收到各个ctx对象发送邻居消息过来的顶点A。

各个顶点聚合完后,返回一个nbrs,该RDD的每一个元素,即(顶点,顶点属性,Array(邻居顶点))——

val nbrs = edgeDirection match {case EdgeDirection.Either =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => {ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))},(a, b) => a ++ b, TripletFields.All)......  
}

接着将原图graph的顶点vertices的rdd与聚合结果nbrs做左连接,返回一个新的 VertexRDD 对象,其中每个顶点都附带了它的邻居信息。如果某个顶点没有邻居信息(在 nbrs 中不存在对应的条目),则使用空数组来表示它的邻居。

graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}

最后,得到的顶点关联邻居顶点的RDD情况,就如前文打印的那样——

(5,[Lscala.Tuple2;@bb793d7) 顶点5展开邻居顶点=> 顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)}
(8,[Lscala.Tuple2;@6d5786e6) 顶点8展开邻居顶点=> 顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)}
(1,[Lscala.Tuple2;@398cb9ea) 顶点1展开邻居顶点=> 顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)}
(9,[Lscala.Tuple2;@61c4eeb2) 顶点9展开邻居顶点=> 顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)}
(2,[Lscala.Tuple2;@d7d0256) 顶点2展开邻居顶点=> 顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)}
(6,[Lscala.Tuple2;@538f0156) 顶点6展开邻居顶点=> 顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)}
(7,[Lscala.Tuple2;@77a17e3d) 顶点7展开邻居顶点=> 顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)}
(3,[Lscala.Tuple2;@1be2a4fb) 顶点3展开邻居顶点=> 顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)}
(4,[Lscala.Tuple2;@1e0153f9) 顶点4展开邻居顶点=> 顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}

image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言--每日选择题--Day37

第一题 1. 有以下说明语句:则下面引用形式错误的是() struct Student {int num;double score; };struct Student stu[3] {{1001,80}, {1002,75}, {1003,91}} struct Student *p stu; A:p->num B:(p).num C&#…

[论文精读]序列建模使大视觉模型的规模化学习成为可能

本博客是一篇最新论文的精读,论文为UC伯克利大学及约翰霍普金斯大学相关研究者新近(2023.12.1)在arxiv上上传的《Sequential Modeling Enables Scalable Learning for Large Vision Models》 。知名科技新媒体“新智元”给与本文极高评价,并以《计算机视…

STM32(PWM、ADC)

1、PWM 定义 PWM,全称为脉冲宽度调制(Pulse Width Modulation),它通过改变信号的高电平和低电平的持续时间比例来控制输出信号的平均功率或电压。 PWM,全称为脉冲宽度调制(Pulse Width Modulation&#xff…

FPGA实现电机位置环、速度环双闭环PID控制

一、设计思路 主要设计思路就是根据之前写的一篇FPGA实现电机转速PID控制,前面已经实现了位置环的控制,思想就是通过电机编码器的当前位置值不断地修正PID去控制速度。 那为了更好的实现控制,可以在位置环后加上速度环,实现电机位…

scipy

scipy 是什么常用方法 是什么 scipy是Python语言的一个开源数值计算库,主要目的是为科学、工程、计算等领域提供有用的数学算法和函数,包括线性代数、优化、信号处理、傅里叶变换、统计函数等。它是Python科学计算环境的重要组成部分,通常与N…

2021年GopherChina大会-核心PPT资料下载

一、峰会简介 自 Go 语言诞生以来,中国便是其应用最早和最广的国家之一,根据 Jetbrains 在 2021 年初做的调查报告,总体来说目前大概有 110 万专业的开发者 选择 Go 作为其主要开发语言。就其全球分布而言, 居住在亚洲的开发者最多&#xff…

go学习之goroutine和channel

文章目录 一、goroutine(协程)1.goroutine入门2.goroutine基本介绍-1.进程和线程说明-2.程序、进程和线程的关系示意图-3.Go协程和Go主线程 3.案例说明4.小结5.MPG模式基本介绍6.设置Golang运行的CPU数7.协程并发(并行)资源竞争的问题8.全局互斥锁解决资…

MySQL 8 update语句更新数据表里边的数据

数据重新补充 这里使用alter table Bookbought.bookuser add userage INT after userphone;为用户表bookuser在userphone列后边添加一个类型为INT的新列userage。 使用alter table Bookbought.bookuser add sex varchar(6) after userage ;为用户表bookuser在userage 列后边添…

Oracle-数据库连接数异常上涨问题分析

问题: 用户的数据库在某个时间段出现连接数异常上涨问题,时间持续5分钟左右,并且问题期间应用无法正常连接请求数据库 从连接数的监控上可以看到数据库平常峰值不到100个连接,在问题时间段突然上涨到400以上 问题分析:…

unity | 动画模块之循环滚动选项框

一、作者的话 评论区有人问,有没有竖排循环轮播选项框,我就写了一个 二、效果动画 如果不是你们想要的,就省的你们继续往下看了 三、制作思路 把移动分成里面的方块,还有背景(父物体),方块自…

网络模拟与网络仿真

目录 一、概念界定 二、模拟(simulation)与仿真(emulation) 2.1 模拟(simulation) 2.2 仿真(emulation) 2.3 区分 三、网络模拟与网络仿真 3.1 网络模拟 3.2 网络仿真 3.…

【算法】算法题-20231206

这里写目录标题 一、非自身以外数字的乘积二、最大数三、奇数排序 一、非自身以外数字的乘积 给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀…

VA05销售报表屏幕增强

业务需求:在VA05报表界面增强两个字段(BELNR1/BELNR2). 第一步:扩展VA05相关表结构 由于新增的字段是按照销售订单行维度展示的,所以本篇加在VBAP表里(不扩展表字段,直接写增强,会…

融云 Global IM UIKit,灵活易用的即时通讯组件设计思路和最佳实践

(全网都在找的《社交泛娱乐出海作战地图》,点击获取👆) 融云近期推出的 Global IM UIKit,支持开发者高效满足海外用户交互体验需求,且保留了相当的产品张力赋予开发者更多自由和灵活性,是实现全…

现货黄金会面临哪些风险?

进行现货黄金投资,我们除了要了解怎么找到交易机会以外,也要知道我们交易会面临哪些风险,了解风险就是做到知己知彼,了解风险才能控制风险。控制住风险,才能为我们稳定盈利打好基础,那么下面我们就来看看在…

ESP32-Web-Server编程-在网页中插入图片

ESP32-Web-Server编程-在网页中插入图片 概述 图胜与言,在网页端显示含义清晰的图片,可以使得内容更容易理解。 需求及功能解析 本节演示在 ESP32 Web 服务器上插入若干图片。在插入图片时还可以对图片设置一个超链接,用户点击该图片时&a…

Oracle merge into语句(merge into Statement)

在Oracle中,常规的DML语句只能完成单一功能,,例如insert/delete/update只能三选一,而merge into语句可以同时对一张表进行更新/插入/删除。 目录 一、基本语法 二、用法示例 2.1 同时更新和插入 2.2 where子句 2.3 delete子句 2.4…

Gitee项目推荐-HasChat

最近由于使用的局域网通信工具总是出问题,就在考虑有没有好的替代品。搜索了一番,发现这个还不错: HasChat: 一款极简聊天应用,比较完整,略好看 页面简洁,功能也比较齐全, 感兴趣的小伙伴可以…

【Redis】redis 高性能--线程模型以及epoll网络框架

目录 一.前言 二.多线程的弊端 2.1 锁的开销问题 2.2 多线程上下文切换带来的额外开销 2.3 多线程占用内存成本增高 三.基本IO模型与epoll 模式 3.1 基本IO模型 3.2 单线程处理机制 四.总结 一.前言 我们经常讨论到,redis 是单线程,那为什么单线…

sizeof()、strlen()、length()、size()的区别(笔记)

​ 上面的笔记有点简陋,可以看一下下面这个博主的: c/c中sizeof()、strlen()、length()、size()详解和区别_csize,sizeof,length_xuechanba的博客-CSDN博客