twitter storm源码走读(五)

TridentTopology创建过程详解

从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构来维护具体到TridentTopology,实现图的各种操作的组件是jgrapht。

说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。

TridentTopology在转换成为普通的StormTopology时,需要将原始的图分成各个group,每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢;如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。

关于jgrapht的更多信息,请参考其官方网站 http://jgrapht.org

概要

在TridentTopology中向图中添加结点的api有三种:

  1. addNode
  2. addSourcedNode
  3. addSourcedStateNode

其中addNode在创建stream是使用,addSourcedStateNode在partitionPersist时使用到,其它的operation使用到的是addSourcedNode.

addNode与其它两个方法的一个重要区别还在于,addNode是不需要添加边(Edge),而其它两个API需要往图中添加edge,以确定该node的源是哪个。

TridentTopology

1
2
3
4
public TridentTopology() {
        _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
        _gen = new UniqueIdGen();
    }

 在TridentTopology的构造函数中,创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。

newStream

 newStream会为DAG(有向无环图)中创建源结点,其调用关系如下所示。

  • newStream
    • addNode
      • registerNode
复制代码
 1 protected void registerNode(Node n) {
 2         _graph.addVertex(n);
 3         if(n.stateInfo!=null) {
 4             String id = n.stateInfo.id;
 5             if(!_colocate.containsKey(id)) {
 6                 _colocate.put(id, new ArrayList());
 7             }
 8             _colocate.get(id).add(n);
 9         }
10     }
复制代码

 

each

作用于stream上的Operation有很多,以each为例来看新的operation是如何转换成为node添加到_graph中的。

复制代码
//Stream.java
public Stream each(Fields inputFields, Function function, Fields functionFields) {projectionValidation(inputFields);return _topology.addSourcedNode(this,new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new EachProcessor(inputFields, function)));}
复制代码

调用关系描述如下

  • Stream::each
  • TridentTopology::addSourcedNode
  • TridentTopology::registerSourcedNode

registerSourcedNode的实现如下

复制代码
protected void registerSourcedNode(List<Stream> sources, Node newNode) {registerNode(newNode);int streamIndex = 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex++;}        }
复制代码

注意此处添加edge是,是有索引的,这样可以区别处理的先后顺序。

在Stream中含有成员变量_node,表示stream最近停泊的node,有了该变量添加edge才成为了可能。

 

partitionPersist

复制代码
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}
复制代码

调用关系

  • Stream::partitionPersist
  • TridentTopology::addSourcedStateNode
  • TridentTopology::registerSourcedNode

与addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream

既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery,

复制代码
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {projectionValidation(inputFields);String stateId = state._node.stateInfo.id;Node n = new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new StateQueryProcessor(stateId, inputFields, function));_topology._colocate.get(stateId).add(n);return _topology.addSourcedNode(this, n);}
复制代码

从此处可以看出stateQueryNode最起码有两个inputStream,一是从TridentState而来表示状态已经改变,另一个是处于drpcStream这个方面的上一跳结点。

build

TridentTopology::build是将TridentTopology转变为StormTopology的过程,这一过程中最重要的一环就是将_graph中含有的node进行分组。

grouping

算法逻辑概述

  • 将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups
  • 以graph和initialGroups作为入参创建GraphGrouper
  • 分组的过程其实就是进行合并的过程,详见GraphGrouper::mergeFully()
    • 如果从当前group1的输出目的地都是属于group2,则将group1,group2合并
    • 如果当前group1的所有输入源都是来自于group2,则将group1,group2合并
    • 将需要合并的group1,group2作为入参创建新的group,同时将group1,group2从已有的集合出移除
复制代码
   public void mergeFully() {boolean somethingHappened = true;while(somethingHappened) {somethingHappened = false;for(Group g: currGroups) {Collection<Group> outgoingGroups = outgoingGroups(g);if(outgoingGroups.size()==1) {Group out = outgoingGroups.iterator().next();if(out!=null) {merge(g, out);somethingHappened = true;break;}}Collection<Group> incomingGroups = incomingGroups(g);if(incomingGroups.size()==1) {Group in = incomingGroups.iterator().next();if(in!=null) {merge(g, in);somethingHappened = true;break;}}                }}}
复制代码

GraphGrouper::merge()

复制代码
  private void merge(Group g1, Group g2) {Group newGroup = new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);}}
复制代码

在group之间添加partitionNode

复制代码
// add identity partitions between groupsfor(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                Group g1 = grouper.nodeGroup(e.source);Group g2 = grouper.nodeGroup(e.target);// g1 being null means the source is a spout nodeif(g1==null && !(e.source instanceof SpoutNode))throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");if(g1==null || !g1.equals(g2)) {graph.removeEdge(e);PartitionNode pNode = makeIdentityPartition(e.source);graph.addVertex(pNode);graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    }}}
复制代码


_graph中所有的node在变换过后,变成两组元素,一是spoutNodes,另一个是合并后的mergedGroup.

spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中,mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。

小结

到目前为止,通过两篇文章分析了TridentTopology的创建过程及其运行时在每个TridentBoltExecutor中的消息传递情况。接下来会分析TridentTopology提供的API实现及其作用场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/292964.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言宏使用常见问题

代码&#xff1a; #include<stdio.h> #define MAX(a,b) a>b?a:b #define MIN(a,b) a>b?b:a //#define M (xY) #define M1(m) m*m #define M2(m) (m)*(m) #define M3(m) ((m)*(m)) int main(){int x,y,max,min;printf("Input one numbers:");int sum, m…

ad域管理与维护_AD域管理员账号下发

大家好&#xff0c;最近比较忙&#xff0c;好久没发文章了&#xff0c;这次继续讲AD域的相关内容。AD域运行在Windows Server服务器&#xff0c;用于集中管理网内的所有Windows客户端主机&#xff0c;其中最重要的管理手段便是「域组策略」&#xff0c;可管理的条目非常多&…

java中main函数解析

作者&#xff1a;xwdreamer出处&#xff1a;http://www.cnblogs.com/xwdreamer欢迎任何形式的转载&#xff0c;但请务必注明出处。从写java至今&#xff0c;写的最多的可能就是主函数 public static void main(String[] args) {} 但是以前一直都没有问自己&#xff0c;为什么要…

逻辑回归算法原理

http://ihoge.cn/2018/LR.html 逻辑回归模型 逻辑回归也被称为对数几率回归&#xff0c;算法名虽然叫做逻辑回归&#xff0c;但是该算法是分类算法&#xff0c;个人认为这是因为逻辑回归用了和回归类似的方法来解决了分类问题。 逻辑回归模型是一种分类模型&#xff0c;用条…

.net core入门之web应用

2019独角兽企业重金招聘Python工程师标准>>> 其实铺垫了那么久&#xff0c;终于到重点了&#xff0c;迫不及待了吧&#xff0c;那么我们用重量级工具Visual Studio 2015&#xff0c;安装Update3&#xff0c; 安装DotNetCore.1.0.1-VS2015Tools.Preview2.0.2.exe&…

python里split_python中split()的用法

原博文 2018-10-19 15:15 − Python split() 通过指定分隔符对字符串进行切片&#xff0c;如果参数 num 有指定值&#xff0c;则仅分隔 num 个子字符串。 语法&#xff1a; str.split(str"", numstring.count(str)) str -- 分隔符&#xff0c;默认为所有的空字符&…

设计模式之-命令模式(Command Pattern)

命令模式&#xff08;Command Pattern&#xff09;是用来实现在一个请求 - 响应模型松耦合。在命令模式中&#xff0c;请求被发送给调用者和调用它传递给被封装的命令对象。 Command对象将请求传递到接收器的适当的方法来执行特定操作。客户端程序创建接收器对象&#xff0c;然…

android -上传文件到服务器

android上传文件到服务器 重点:最好是设置好content-type这些参数的配置&#xff01; package com.spring.sky.image.upload.network; import java.io.DataOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.Inp…

梯度下降法、随机梯度下降法、批量梯度下降法及牛顿法、拟牛顿法、共轭梯度法

http://ihoge.cn/2018/GradientDescent.html http://ihoge.cn/2018/newton1.html 引言 李航老师在《统计学习方法》中将机器学习的三要素总结为&#xff1a;模型、策略和算法。其大致含义如下&#xff1a; 模型&#xff1a;其实就是机器学习训练的过程中所要学习的条件概率…

浅谈.NET 6 中 gRPC 的最新功能

gRPC 是一个现代的、跨平台的、高性能的 RPC 框架。gRPC 是构建在 ASP.NET Core 之上&#xff0c;也是我们推荐的使用 .NET 构建 RPC 服务的方法。.NET 6 进一步提高了 gRPC 已经非常出色的性能&#xff0c;并添加了一系列新功能&#xff0c;使 gRPC 在现代云原生应用程序中比以…

如果你没有时间读书,至少要保留这个习惯

全世界只有3.14 % 的人关注了爆炸吧知识快节奏的生活中&#xff0c;如何花费少量的时间&#xff0c;就可以让自己的生活发生好的改变&#xff1f;知乎上有一个高赞回答&#xff1a;大量阅读优质文章。见识决定了你的眼界&#xff0c;站得越高&#xff0c;看得越多&#xff0c;你…

C++ 标准模板库

转载于:https://www.cnblogs.com/Flyzhcong/p/3977865.html

.NET Conf 2021 回顾

.NET Conf 2021是有史以来规模最大的.NET Conf,全球演讲者举办了80多场会议!我们非常感谢所有收看直播、在分享会上的提问、参与我们的乐趣和游戏的人。学习将持续到1月底&#xff0c;社区活动将持续进行&#xff0c;所以一定要查看这些活动&#xff0c;并关注我们的会议 GitHu…

男厕改女厕能多敷衍......

1 搓澡时最好不要闲聊&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼2 弟弟需要充电多久才能回家&#xff1f;&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼3 这都什么野史&#xff1f;▼4 脚崴了怎么办&#xff1f;▼5 钥匙在老地方▼6 被红绿灯伤透…

《Single Image Haze Removal Using Dark Channel Prior》一文中图像去雾算法的原理、实现、效果(速度可实时)...

最新的效果见 &#xff1a;http://video.sina.com.cn/v/b/124538950-1254492273.html 可处理视频的示例&#xff1a;视频去雾效果 在图像去雾这个领域&#xff0c;几乎没有人不知道《Single Image Haze Removal Using Dark Channel Prior》这篇文章&#xff0c;该文是2009年C…

python 运算符重载_Python3面向对象-运算符重载

1&#xff1a;运算符重载介绍运算符重载&#xff0c;就是在某个类的方法中&#xff0c;拦截其内置的操作(比如&#xff1a;&#xff0c;-&#xff0c;*&#xff0c;/,比较&#xff0c;属性访问&#xff0c;等等)&#xff0c;使其实例的行为接近内置类型。当类的实例出现在内置操…

docker Failed to get D-Bus connection 报错

在centos7的容器里面出现了一个BUG&#xff0c;就是serveice启动服务的时候出现报错&#xff0c;不能用service启动服务。[roote13c3d3802d0 /]# service httpd startRedirecting to /bin/systemctl start httpd.serviceFailed to get D-Bus connection: Operation not permit…

牛顿法、拟牛顿法、高斯-牛顿法、共轭梯度法推导总结

原文&#xff1a;http://ihoge.cn/2018/newton1.html 前言&#xff1a; 线性最小二乘问题&#xff0c;我们可以通过理论推导可以得到其解析解&#xff0c;但是对于非线性最小二乘问题&#xff0c;则需要依赖迭代优化的方法&#xff0c;牛顿算法是解决非线性最优的常见算法之一…

不用变量交换2个值

题目: 不使用变量交换2个值 代码如下: #include <iostream> using namespace std;int main() { int a = 3;int b = 4;cout<<"a="<<a<<endl;cout<<"b="<<b<<endl;a = a ^ b;b = a ^ b;a = a ^ b;cout<&…