Akka(19): Stream:组合数据流,组合共用-Graph modular composition

   akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的Graph。因为Graph只是对数据流运算的描述,所以它是可以被重复利用的。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高的功能层面上实现Graph的模块化(modular)。按上回讨论,Graph又可以被描述成一种黑盒子,它的入口和出口就是Shape,而内部的作用即处理步骤Stage则是用GraphStage来形容的。下面是akka-stream预设的一些基础数据流图:

compose_shapes.png

上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。我们可以用以上这些基础Graph来构建更复杂的复合流图,而这些复合流图又可以被重复利用去构建更复杂的复合流图。下面就是一些常见的复合流图:

compose_composites.png

注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:

def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =fromSinkAndSourceMat(sink, source)(Keep.none)

这个Flow从流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。我们必须用CoupledTermination对象中的fromSinkAndSource函数构建的Flow来解决这个问题:

/*** Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.* Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.*/
object CoupledTerminationFlow {@deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
 

从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:

compose_nested_flow.png

最后变成:

compose_nested_flow_opaque.png

在DSL里我们可以用name("???")来分割模块:

val nestedFlow =Flow[Int].filter(_ != 0) // an atomic processing stage.map(_ - 2) // another atomic processing stage.named("nestedFlow") // wraps up the Flow, and gives it a name

val nestedSink =nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow.named("nestedSink") // wrap it up// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)

在下面这个示范里我们自定义一个某种功能的流图模块:它有2个输入和3个输出。然后我们再使用这个自定义流图模块组建一个完整的闭合流图:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._import scala.collection.immutableobject GraphModules {def someProcess[I, O]: I => O = i => i.asInstanceOf[O]case class TwoThreeShape[I, I2, O, O2, O3](in1: Inlet[I],in2: Inlet[I2],out1: Outlet[O],out2: Outlet[O2],out3: Outlet[O3]) extends Shape {override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Niloverride def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Niloverride def deepCopy(): Shape = TwoThreeShape(in1.carbonCopy(),in2.carbonCopy(),out1.carbonCopy(),out2.carbonCopy(),out3.carbonCopy())}
//a functional module with 2 input 3 outputdef TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder =>val balancer = builder.add(Balance[I](2))val flow = builder.add(Flow[I2].map(someProcess[I2, O2]))TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out)}val closedGraph = GraphDSL.create() {implicit builder =>import GraphDSL.Implicits._val inp1 = builder.add(Source(List(1,2,3))).outval inp2 = builder.add(Source(List(10,20,30))).outval merge = builder.add(Merge[Int](2))val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int])inp1 ~> mod23.in1inp2 ~> mod23.in2mod23.out1 ~> merge.in(0)mod23.out2 ~> merge.in(1)mod23.out3 ~> Sink.foreach(println)merge ~> Sink.foreach(println)ClosedShape}
}object TailorGraph extends App {import GraphModules._implicit val sys = ActorSystem("streamSys")implicit val ec = sys.dispatcherimplicit val mat = ActorMaterializer()RunnableGraph.fromGraph(closedGraph).run()scala.io.StdIn.readLine()sys.terminate()}

这个自定义的TwoThreeGraph是一个复合的流图模块,是可以重复使用的。注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如:

      def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), b.add(to).in)def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), to.in)
...

所以对于我们自定义的TwoThreeShape就只能使用直接的端口连接了:

   def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), to)

以上的过程显示:通过akka的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。我们再来看个较复杂复合流图的构建过程,下面是这个流图的图示:

compose_graph.png

可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;

  import GraphDSL.Implicits._RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>val A: Outlet[Int]                  = builder.add(Source.single(0)).outval B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))val G: Inlet[Any]                   = builder.add(Sink.foreach(println)).inC     <~      FA  ~>  B  ~>  C     ~>      FB  ~>  D  ~>  E  ~>  FE  ~>  GClosedShape})

另一个端口连接方式的版本如下:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>val B = builder.add(Broadcast[Int](2))val C = builder.add(Merge[Int](2))val E = builder.add(Balance[Int](2))val F = builder.add(Merge[Int](2))Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)C.in(0) <~ F.outB.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)E.out(1) ~> Sink.foreach(println)ClosedShape
})

如果把上面这个复杂的Graph切分成模块的话,其中一部分是这样的:

compose_graph_partial.png

这个开放数据流复合图可以用GraphDSL这样构建:
val partial = GraphDSL.create() { implicit builder =>val B = builder.add(Broadcast[Int](2))val C = builder.add(Merge[Int](2))val E = builder.add(Balance[Int](2))val F = builder.add(Merge[Int](2))C  <~  FB  ~>                            C  ~>  FB  ~>  Flow[Int].map(_ + 1)  ~>  E  ~>  FFlowShape(B.in, E.out(1))}.named("partial")
模块化的完整Graph图示如下:
compose_graph_flow.png
这部分可以用下面的代码来实现:
// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>val merge = builder.add(Merge[Int](2))Source.single(0)      ~> mergeSource(List(2, 3, 4)) ~> merge// Exposing exactly one output portSourceShape(merge.out)
})// Building a Sink with a nested Flow, using the fluid DSL
val sink = {val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")nestedFlow.to(Sink.head)
}// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)
和scalaz-stream不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。这个运算结果在复合流图中传播的过程是可控的,如下图示:
compose_mat.png

返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/tiger-xc/p/7421514.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/428851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS-posiziton

1. 想要实现&#xff0c;”返回顶部”永远位于页面的右下角。需要用到position函数。CSS:层叠样式表。用到了分层的功能。 position:fixed; 永远固定在一个地方。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8">&…

怎么用树莓派制作web服务器,用树莓派做web服务器,靠谱吗?

有点想入门树莓派&#xff0c;然后做一个小web服务器&#xff0c;放在学校内网。大家有做过类似的事情吗&#xff1f;做过&#xff0c;自己用做测试的话是没什么问题的&#xff0c;而且非常小巧&#xff0c;携带方便。买的时候注意还要搭配这三个配件1 可以用的无线网卡&#x…

笔记本如何与其他计算机共享,笔记本电脑怎么和手机共享文件

假如想要用手机打开电脑上大容量的视频或其他文件&#xff0c;但是手机的容量又比较小&#xff0c;该怎么办呢?这个时候&#xff0c;我们就可以在电脑上设置共享文件夹&#xff0c;然后在手机上通过局域网来查看该共享文件夹就可以解决这个问题。那么笔记本电脑怎么和手机共享…

服务器系统崩了能pe,系统崩溃了无法正常重装系统?教你用PE虚拟盘来解决!...

如果电脑系统损坏开不了机怎么办&#xff1f;安全模式啥的都进入不了怎么办&#xff1f;不用怕&#xff0c;小编教你用PE重装系统&#xff0c;十分简单哦。用PE系统镜像还原重装系统&#xff1a;工具&#xff1a;U盘(最好有8G及以上的容量&#xff0c;因为一个windows7以上的系…

群晖218 修改服务器名称,一次换群晖引发的各种事情——论如何榨干218+的价值【不完全版】...

一次换群晖引发的各种事情——论如何榨干218的价值【不完全版】2020-04-08 16:40:0117点赞100收藏29评论创作立场声明&#xff1a;期中考试爸妈送的……购买理由大概用了两年的DS115j&#xff0c;性能实在受不了(ARM的想啥呢)然后就换了个218然后特么发现x64的就是舒服&#xf…

文件服务器上传文件的过程,文件服务器上传文件实现过程【分享】

写本帖的目的是&#xff0c;论坛经常有咨询或反馈文件服务器问题。本帖准备把文件服务器整个的实现过程讲清楚。以方便相关问题的或达到问题自查的目的。1、文件服务器 其实是IIS的一个站点。前端(说的前端是Silverlight及WPF等的前端)是直接与文件服务器站点进行交互。其中&am…

linux下搭建mongodb副本集

1.搭建三台mongodb服务器 cd /usr/local mkdir mongodb tar -zxvf mongodb-linux-x86_64-2.6.7.tgz cd mongodb-linux-x86_64-2.6.7 mv * /usr/local/mongodb cd /usr/local/mongodb mkdir data touch logs cd bin ./mongod -dbpath/usr/local/mongodb/data -logpath/usr/local…

ajax轮询模拟websocket,Ajax轮询和SSE服务器推送数据与websocket模式的区别性学习

我们试想一下我们做个实时聊天的窗口有几种方法&#xff1f;在我们不刷新页面并且可以试试更新页面内容的方法 你这时候是不是想到了ajax没错确实可以Ajax轮询什么是轮询&#xff1f;顾名思义就是我轮着问你&#xff0c;规定一个时间然后我就问你 有新数据了吗&#xff1f; 有新…

vue项目前端服务器,【前端技术】vue自动部署项目到服务器

想要的功能前端打包之后自动上传文件夹到服务器在不提交代码的前提下&#xff0c;也可以完成上述功能经过各种百度谷歌&#xff0c;最后有两种方案可以选择第一种是写一个shell&#xff0c;通过lftp上传文件夹&#xff0c;但是会有个权限的问题&#xff0c;需要更改nginx配置才…

官狼服务器临时维护,官狼三分钟新模式,跑跑狼人杀如何边跑边杀

作为风靡全球的桌游&#xff0c;狼人杀一度成为了国内白领、学生党的国民游戏。随着网络的普及&#xff0c;很多玩家开始转求线上网杀&#xff0c;网易《狼人杀官方》因为玩法最多画风好看&#xff0c;一直是大多数人的选择。而就在最近&#xff0c;《狼人杀官方》重磅推出了年…

三星s6 信号无服务器,手机没信号的原因以及解决方法

现在手机是一件很普遍的通讯工具&#xff0c;因为现在手机功能的强大&#xff0c;所以基本上人人一部手机。目前在社会上比较流行的牌子就是苹果或者三星手机。不过想必大家在用手机的时候都会出现手机没有信号这个问题。这个问题是一件比较麻烦的问题&#xff0c;有的时候急需…

不同型号服务器如何做双击热备,服务器做双机热备教程

服务器做双机热备教程 内容精选换一换将GaussDB(DWS)提供的ODBC DRIVER(psqlodbcw.so)配置到数据源中便可使用。配置数据源需要配置“odbc.ini”和“odbcinst.ini”两个文件(在编译安装unixODBC过程中生成且默认放在“/usr/local/etc”目录下)&#xff0c;并在服务器端进行配置…

文档上传到服务器上,将文件上传到服务器上

将文件上传到服务器上 内容精选换一换为了实现NAT Server、SAP HANA主备节点和SAP S/4HANA主备节点互相通过SSH协议跳转的功能&#xff0c;需要配置云服务器之间的互信。在本地PC上&#xff0c;生成登录NAT Server的密钥文件。在创建NAT Server时&#xff0c;指定了NAT Server的…

win10配置JAVA和tomacat环境变量

一、配置JAVA_HOME 二、配置CATALINA_HOME &#xff08;tomacat的环境变量&#xff09; 三、配置PATH 四、命令行检验配置结果&#xff1a; 1、命令行输入java 2、命令行输入javac 3、命令行输入startup.bat 注&#xff1a;如果以上三个命令无报错表示配置成功 转载于:https://…

设计模式 策略模式2 c++11

根据需求的不同 选择不同的策略算法 之前是保存的各种策略类的指针 这里直接使用 function bind 选择对应的算法 代码 // 005.cpp: 定义控制台应用程序的入口点。 //#include "stdafx.h" #include <functional> #include <iostream>using namespace std…

python学习(八)定制类和枚举

python定制类主要是实现特定功能&#xff0c;通过在类中定义特定的函数完成特定的功能。 class Student(object):def __init__(self, name):self.name namestudent Student("lilei") print(student) 实现定制类 class Student(object):def __init__(self, name):sel…

架构实战:(一)Redis采用主从架构的原因

架构实战 &#xff08;一&#xff09;Redis采用主从架构的原因 &#xff08;二&#xff09; 如果系统的QPS超过10W&#xff0c;甚至是百万以上的访问&#xff0c;则光是Redis是不够的&#xff0c;但是Redis是整个大型缓存架构中&#xff0c;支撑高并发的架构非常重要的环节。 首…

《JavaWeb从入门到改行》注册时向指定邮箱发送邮件激活

javaMail API javaMail是SUN公司提供的针对邮件的API 。 两个jar包 mail.jar 和 activation.jar java mail中主要类&#xff1a;javax.mail.Session、javax.mail.internet.MimeMessage、javax.mail.Transport。 Session 表示会话&#xff0c;即客户端与邮件服务器之…

HTTP之报文

HTTP 报文 用于 HTTP 协议交互的信息被称为 HTTP 报文。请求端&#xff08;客户端&#xff09;的 HTTP 报文叫做请求报文&#xff0c;响应端&#xff08;服务器端&#xff09;的叫做响应报文。HTTP 报文本身是由多行&#xff08;用 CRLF 作换行符&#xff09;数据构成的字符串文…

python 内置函数

一 print( ) flush的应用——模拟进度条 import time for i in range(1,101):time.sleep(0.1)print(\r{}%:{}.format(i,**i),end,flushTrue) #\r &#xff08;return&#xff09; 表示回车 \n &#xff08;new line&#xff09;表示换行&#xff0c;实际上是回车换…