流计算又称实时计算,是继以Map-Reduce为代表的批处理之后的又一重要计算模型。随着互联网业务的发展以及数据规模的持续扩大,传统的批处理计算难以有效地对数据进行快速低延迟处理并返回结果。由于数据几乎处于不断增长的状态中,及时处理计算大批量数据成为了批处理计算的一大难题。在此背景之下,流计算应运而生。相比于传统的批处理计算,流计算具有低延迟、高响应、持续处理的特点。在数据产生的同时,就可以进行计算并获得结果。更可以通过Lambda架构将即时的流计算处理结果与延后的批处理计算结果结合,从而较好地满足低延迟、高正确性的业务需求。
Twitter由于本身的业务特性,对实时性有着强烈的需求。因此在流计算上投入了大量的资源进行开发。第一代流处理系统Storm发布以后得到了广泛的关注和应用。根据Storm在实践中遇到的性能、规模、可用性等方面的问题,Twitter又开发了第二代流处理系统——Heron[1],并在2016年将它开源。
重要概念定义
在开始了解Heron的具体架构和设计之前,我们首先定义一些流计算以及在Heron设计中用到的基本概念:
-
Tuple:流计算任务中处理的最小单元数据的抽象。
-
Stream:由无限个Tuple组成的连续序列。
-
Spout:从外界数据源获得数据并生成Tuple的计算任务。
-
Bolt:处理上游Spout或者Bolt生成的Tuple的计算任务。
-
Topology:一个通过Stream将Spout和Bolt相连的处理Tuple的逻辑计算任务。
-
Grouping:流计算中的Tuple分发策略。在Tuple通过Stream传递到下游Bolt的过程中,Grouping策略决定了如何将一个Tuple路由给一个具体的Bolt实例。典型的Grouping策略有:随机分配、基于Tuple内容的分配等。
-
Physical Plan:基于Topology定义的逻辑计算任务以及所拥有的计算资源,生成的实际运行时信息的集合。
在以上流处理基本概念的基础上,我们可以构建出流处理的三种不同处理语义:
-
至多一次(At-Most-Once): 尽可能处理数据,但不保证数据一定会被处理。吞吐量大,计算快但是计算结果存在一定的误差。
-
至少一次(At-Least-Once):在外部数据源允许Replay(重演)的情况下,保证数据至少被处理一次。在出现错误的情况下会重新处理该数据,可能会出现重复处理多次同一数据的情况。保证数据的处理但是延迟升高。
-
仅有一次(Exactly-Once):每一个数据确保被处理且仅被处理一次。结果精确但是所需要的计算资源增多并且还会导致计算效率降低。
从上可知,三种不同的处理模式有各自的优缺点,因此在选择处理模式的时候需要综合考量一个Topology对于吞吐量、延迟、结果误差、计算资源的要求,从而做出最优的选择。目前的Heron已经实现支持至多一次和至少一次语义,并且正在开发对于仅有一次语义的支持。
Heron系统概览
保持与Storm接口(API)兼容是Heron的设计目标之一。因此,Heron的数据模型与Storm的数据模型基本保持一致。每个提交给Heron的Topology都是一个由Spout和Bolt这两类结点(Vertex)组成的,以Stream为边(Edge)的有向无环图(Directed acyclic graph)。其中Spout结点是Topology的数据源,它从外部读取Topology所需要处理的数据,常见的如kafka-spout,然后发送给后续的Bolt结点进行处理。Bolt节点进行实际的数据计算,常见的运算如Filter、Map以及FlatMap等。
我们可以把Heron的Topology类比为数据库的逻辑查询计划。这种逻辑上的计划最后都要变成实质上的处理计划才能执行。用户在编写Topology时指定每个Spout和Bolt任务的并行度和Tuple在Topology中结点间的分发策略(Grouping)。所有用户提供的信息经过打包算法(Pakcing)的计算,这些Spout和Bolt任务(task)被分配到一批抽象容器中。最后再把这些抽象容器映射到真实的容器中,就可以生成一个物理上可执行的计划(Physical plan),它是所有逻辑信息(拓扑图、并行度、计算任务)和运行时信息(计算任务和容器的对应关系、实际运行地址)的集合。
整体结构
总体上,Heron的整体架构如图1所示。用户通过命令行工具(Heron-CLI)将Topology提交给Heron Scheduler。再由Scheduler对提交的Topology进行资源分配以及运行调度。在同一时间,同一个资源平台上可以运行多个相互独立Topology。
图1 Heron架构
与Storm的Service架构不同,Heron是Library架构。Storm在架构设计上是基于服务的,因此需要设立专有的Storm集群来运行用户提交的Topology。在开发、运维以及成本上,都有诸多的不足。而Heron则是基于库的,可以运行在任意的共享资源调度平台上。最大化地降低了运维负担以及成本开销。
目前的Heron支持Aurora、YARN、Mesos以及EC2,而Kubernetes和Docker等目前正在开发中。通过可扩展插件Heron Scheduler,用户可以根据不同的需求及实际情况选择相应的运行平台,从而达到多平台资源管理器的支持[2]。
而被提交运行Topology的内部结构如图2所示,不同的计算任务被封装在多个容器中运行。这些由调度器调度的容器可以在同一个物理主机上,也可分布在多个主机上。其中每一个Topology的第一个容器(容器0)负责整个Topology的管理工作,主要运行一个Topology Master进程;其余各个容器负责用户提交的计算逻辑的实现,每个容器中主要运行一个Stream Manager进程,一个Metrics Manager进程,以及多个Instance进程。每个Instance都负责运行一个Spout或者Bolt任务(task)。对于Topology Master、Stream Manager以及Instance进程的结构及重要功能,我们会在本文的后面章节进行详细的分析。
图2 Topology结构
状态(State)存储和监控
Heron的State Manager是一个抽象的模块,它在具体实现中可以是ZooKeeper或者是文件系统。它的主要作用是保存各个Topology的各种元信息:Topology的提交者、提交时间、运行时生成的Physical Plan以及Topology Master的地址等,从而为Topology的自我恢复提供帮助。
每个容器中的Metrics Manager负责收集所在容器的运行时状态指标(Metrics),并上传给监控系统。当前Heron版本中,简化的监控系统集成在Topology Master中。将来这一监控模块将会成为容器0中的一个独立进程。Heron还提供Heron-Tracker和Heron-UI 这两个工具来查看和监测一个数据中心中运行的所有Topology。
启动过程
在一个Topology中,Topology Master是整个Topology的元信息管理者,它维护着完整的Topology元信息。而Stream Manager是每个容器的网关,它负责各个Instance之间的数据通信,以及和Topology Master之间的控制信令。
当用户提交Topology之后,Scheduler便会开始分配资源并运行容器。每个容器中启动一个Heron Executor的进程,它区分容器0和其他容器,分别启动Topology Master或者Stream Manager等进程。在一个普通容器中,Instance进程启动后会主动向本地容器的Stream Manager进行注册。当Stream Manager收到所有Instance的注册请求后,会向Topology Master发送包含了自己的所负责的Instance的注册信息。当Topology Master收到所有Stream Manager的注册信息以后,会生成一个各个Instance,Stream Manager的实际运行地址的Physical Plan并进行广播分发。收到了Physical Plan的各个Stream Manager之间就可以根据这一Physical Plan互相建立连接形成一个完全图,然后开始处理数据。
Instance进行具体的Tuple数据计算处理。Stream Manager则不执行具体的计算处理任务,只负责中继转发Tuple。从数据流网络的角度,可以把Stream Manager理解为每个容器的路由器。所有Instance之间的Tuple传递都是通过Stream Manager中继。因此容器内的Instance之间通信是一跳(hop)的星形网络。所有的Stream Manager都互相连接,形成Mesh网络。容器之间的通信也是通过Stream Manager中继的,是通过两跳的中继完成的。
核心组件分析
TMaster
TMaster是Topology Master的简写。与很多Master-Slave模式分布式系统中的Master单点处理控制逻辑的作用相同,TMaster作为Master角色提供了一个全局的接口来了解Topology的运行状态。同时,通过将重要的状态信息(Physical Plan)等记录到ZooKeeper中,保证了TMaster在崩溃恢复之后能继续运行。
实际产品中的TMaster在启动的时候,会在ZooKeeper的某一约定目录中创建一个Ephemeral Node来存储自己的IP地址以及端口,让Stream Manager能发现自己。Heron使用Ephemeral Node的原因包括:
-
避免了一个Topology出现多个TMaster的情况。这样就使得这个Topology的所有进程都能认定同一个TMaster;
-
同一Topology内部的进程能够通过ZooKeeper来发现TMaster所在的位置,从而与其建立连接。
TMaster主要有以下三个功能:
-
构建、分发并维护Topology的Physical Plan;
-
收集各个Stream Manager的心跳,确认Stream Manager的存活;
-
收集和分发Topology部分重要的运行时状态指标(Metrics)。
由于Topology的Physical Plan只有在运行时才能确定,因此TMaster就成为了构建、分发以及维护Physical Plan的最佳选择。在TMaster完成启动和向ZooKeeper注册之后,会等待所有的Stream Manager与自己建立连接。在Stream Manager与TMaster建立连接之后,Stream Manager会报告自己的实际IP地址、端口以及自己所负责的Instance地址与端口。TMaster在收到所有Stream Manager报告的地址信息之后就能构建出Physical Plan并进行广播分发。所有的Stream Manager都会收到由TMaster构建的Physical Plan,并且根据其中的信息与其余的Stream Manager建立两两连接。只有当所有的连接都建立完成之后,Topology才会真正开始进行数据的运算和处理。当某一个Stream Manager丢失并重连之后,TMaster会检测其运行地址及端口是否发生了改变;若改变,则会及时地更新Physical Plan并广播分发,使Stream Manager能够建立正确的连接,从而保证整个Topology的正确运行。
TMaster会接受Stream Manager定时发送的心跳信息并且维护各个Stream Manager的最近一次心跳时间戳。心跳首先能够帮助TMaster确认Stream Manager的存活,其次可以帮助其决定是否更新一个Stream Manager的连接并且更新Physical Plan。
TMaster还会接受由Metrics Manager发送的一部分重要Metrics并且向Heron-Tracker提供这些Metrics。Heron-Tracker可以通过这些Metrics来确定Topology的运行情况并使得Heron-UI能够基于这些重要的Metrics来进行监控检测。典型的Metrics有:分发Tuple的次数,计算Tuple的次数以及处于backpressure状态的时间等。
非常值得注意的一点是,TMaster本身并不参与任何实际的数据处理。因此它也不会接受和分发任何的Tuple。这一设计使得TMaster本身逻辑清晰,也非常轻量,同时也为以后功能的拓展留下了巨大的空间。
Stream Manager 和反压(Back pressure)机制
Stmgr是Stream Manager的简写。Stmgr管理着Tuple的路由,并负责中继Tuple。当Stmgr拿到Physical Plan以后就能根据其中的信息知道与其余的Stmgr建立连接形成Mesh网络,从而进行数据中继以及Backpressure控制。Tuple传递路径可以通过图3来说明,图3中容器1的Instance D(1D)要发送一个Tuple给容器4中的Instance C(4C),这个Tuple经过的路径为:容器1的1D,容器1的Stmgr,容器4的Stmgr,容器4的4C。又比如从3A到3B的Tuple经过的路径为:3A,容器3的Stmgr,3B。与Internet的路由机制对比,Heron的路由非常简单,这得益于Stmgr之间两两相连,使得所有的Instance之间的距离不超过2跳。
图3 Tuple发送路径示例
Acking
Stmgr除了路由中继Tuple的功能以外,它还负责确认(Acking)Tuple已经被处理。Acking的概念在Heron的前身Storm中已经存在。Acking机制的目的是为了实现At-Least-Once的语义。原理上,当一个Bolt实例处理完一个Tuple以后,这个Bolt实例发送一个特殊的Acking Tuple给这个bolt的上游Bolt实例或者Spout实例,向上游结点确认Tuple已经处理完成。这个过程层层向上游结点推进,直到Spout结点。实现上,当Acking Tuple经过Stmgr时候由异或(xor)操作标记Tuple,由异或操作的特性得知是否处理完成。当一个Spout实例在一定时间内还没有收集到Acking Tuple,那么它将重发对应的数据Tuple。Heron的Acking机制的实现与它的前任Storm一致。
Back Pressure
Heron引入了反压(Back Pressure)机制,来动态调整Tuple的处理速度以避免系统过载。一般来说,解决系统过载问题有三种策略:1. 放任不管;2. 丢弃过载数据;3. 请求减少负载。Heron采用了第三种策略,通过Backpressure机制来进行过载恢复,保证系统不会在过载的情况下崩溃。
Backpressure机制触发过程如下:当某一个Bolt Instance处理速度跟不上Tuple的输入速度时,会造成负责向该Instance转发Tuple的Stmgr缓存不断堆积。当缓存大小超过一个上限值(Hight Water Mark)时,该Stmgr会停止从本地的Spout中读取Tuple并向Topology中的其他所有Stmgr发送一个“开始Backpressure”的信息。而其余的Stmgr在接收到这一消息时也会停止从他们所负责的Spout Instance处读取并转发Tuple。至此,整个Topology就不再从外界读入Tuple而只处理堆积在内部的未处理Tuple。而处理的速度则由最慢的Instance来决定。在经过一定时间的处理以后,当缓存的大小减低到一个下限值(Low Water Mark)时,最开始发送“开始Backpressure”的Stmgr会再次发送“停止Backpressure”的信息,从而使得所有的Stmgr重新开始从Spout Instance读取分发数据。而由于Spout通常是从具有允许重演(Replay)的消息队列中读取数据,因此即使冻结了也不会导致数据的丢失。
注意在Backpressure的过程中两个重要的数值:上限值(High Water Mark)和下限值(Low Water Mark)。只有当缓存区的大小超过上限值时才会触发Backpressure,然后一直持续到缓存区的大小减低到下限值时。这一设计有效地避免了一个Topology不停地在Backpressure状态和正常状态之间震荡变化的情况发展,一定程度上保证了Topology的稳定。
Instance
Instance是整个Heron处理引擎的核心部分之一。Topology中不论是Spout类型结点还是Bolt类型结点,都是由Instance来实现的。不同于Storm的Worker设计,在当前的Heron中每一个Instance都是一个独立的JVM进程,通过Stmgr进行数据的分发接受,完成用户定义的计算任务。独立进程的设计带来了一系列的优点:便于调试、调优、资源隔离以及容错恢复等。同时,由于数据的分发传送任务已经交由Stmgr来处理,Instance可以用任何编程语言来进行实现,从而支持各种语言平台。
Instance采用双线程的设计,如图4所示。一个Instance的进程包含Gateway以及Task Execution这两个线程。Gateway线程主要控制着Instance与本地Stmgr和Metrics Manager之间的数据交换。通过TCP连接,Gateway线程:1. 接受由Stmgr分发的待处理Tuple;2. 发送经Task Execution处理的Tuple给Stmgr;3. 转发由Task Execution线程产生的Metrics给Metrics Manager。不论是Spout还是Bolt,Gateway线程完成的任务都相同。
Task Execution线程的职责是执行用户定义的计算任务。对于Spout和Bolt,Task Execution线程会相应地去执行open()和prepare()方法来初始化其状态。如果运行的Instance是一个Bolt实例,那么Task Execution线程会执行execute()方法来处理接收到的Tuple;如果是Spout,则会重复执行nextTuple()方法来从外部数据源不停地获取数据,生成Tuple,并发送给下游的Instance进行处理。经过处理的Tuple会被发送至Gateway线程进行下一步的分发。同时在执行的过程中,Task Execution线程会生成各种Metrics(tuple处理数量,tuple处理延迟等)并发送给Metrics Manager进行状态监控。
图4 Instance结构
Gateway线程和Task Execution线程之间通过三个单向的队列来进行通信,分别是数据进入队列、数据发送队列以及Metrics发送队列。Gateway线程通过数据进入队列向Task Execution线程传入Tuple;Task Execution通过数据发送队列将处理完的Tuple发送给Gateway线程;Task Execution线程通过Metrics发送队列将收集的Metric发送给Gateway线程。
总结
在本文中,我们介绍了流计算的背景和重要概念,并且详细分析了Twitter目前的流计算引擎—— Heron的结构及重要组件。希望能借此为大家提供一些在设计和构建流计算系统时的经验,也欢迎大家向我们提供建议和帮助。如果大家对Heron的开发和改进感兴趣,可以在Github上进行查看。
【1】Kulkarni, Sanjeev, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, Jignesh M. Patel, Karthik Ramasamy, and Siddarth Taneja. "Twitter heron: Stream processing at scale." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239-250. ACM, 2015.
【2】Maosong Fu, Ashvin Agrawal, Avrilia Floratou, Bill Graham, Andrew Jorgensen, Mark Li, Neng Lu, Karthik Ramasamy, Sriram Rao and Cong Wang. "Twitter Heron: Towards Extensible Streaming Engines." In 2017 International Conference on Data Engineering (ICDE). IEEE, 2017.
吕能,Twitter实时计算平台团队成员。专注于分布式系统,曾参与过Twitter的Manhattan键值存储系统,Obs监控警报系统的开发,目前负责Heron的开发研究。曾在国际顶级期刊和会议发表多篇学术论文。
吴惠君,Twitter软件工程师,致力于实时流处理引擎Heron的研究和开发。他毕业于Arizona State University,专攻大数据处理和移动云计算,曾在国际顶级期刊和会议发表多篇学术论文,并有多项专利。
符茂松,Twitter实时计算平台团队主管,负责Heron, Presto等服务。Heron的原作者之一。专注于分布式系统,在SIGMOD、ICDE等会议期刊发表多篇论文。本科毕业于华中科技大学,研究生毕业于Carnegie Mellon University。
本文为《程序员》原创文章,未经允许禁止转载