Apache Spark源码走读之4 -- DStream实时流数据处理

欢迎转载,转载请注明出处,徽沪一郎。

Spark Streaming能够对流数据进行近乎实时的速度进行数据处理。采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力。

本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤。

系统概述

流数据的特点

与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点

  1. 数据一直处在变化中
  2. 数据无法回退
  3. 数据一直源源不断的涌进

DStream

如果要用一句话来概括Spark Streaming的处理思路的话,那就是"将连续的数据持久化,离散化,然后进行批量处理"。

让我们来仔细分析一下这么作的原因。

  • 数据持久化 将从网络上接收到的数据先暂时存储下来,为事件处理出错时的事件重演提供可能,
  • 离散化 数据源源不断的涌进,永远没有一个尽头,就像周星驰的喜剧中所说“崇拜之情如黄河之水绵绵不绝,一发而不可收拾”。既然不能穷尽,那么就将其按时间分片。比如采用一分钟为时间间隔,那么在连续的一分钟内收集到的数据集中存储在一起。
  • 批量处理 将持久化下来的数据分批进行处理,处理机制套用之前的RDD模式

DStream可以说是对RDD的又一层封装。如果打开DStream.scala和RDD.scala,可以发现几乎RDD上的所有operation在DStream中都有相应的定义。

作用于DStream上的operation分成两类

  1. Transformation
  2. Output 表示将输出结果,目前支持的有print, saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles

DStreamGraph

有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false 

outputStreams中的元素是在有Output类型的Operation作用于DStream上时自动添加到DStreamGraph中的。

outputStream区别于inputStream一个重要的地方就是会重载generateJob.

初始化流程

StreamingContext

StreamingContext是Spark Streaming初始化的入口点,主要的功能是根据入参来生成JobScheduler

设定InputStream

如果流数据源来自于socket,则使用socketStream。如果数据源来自于不断变化着的文件,则可使用fileStream

提交运行

StreamingContext.start()

 

数据处理

以socketStream为例,数据来自于socket。

SocketInputDstream启动一个线程,该线程使用receive函数来接收数据

 def receive() {                                                                                                          var socket: Socket = null                                                                                              try {                                                                                                                  logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } } 

接收到的数据会被先存储起来,存储最终会调用到BlockManager.scala中的函数,那么BlockManager是如何被传递到StreamingContext的呢?利用SparkEnv传入的,注意StreamingContext构造函数的入参。

处理定时器

数据的存储有是被socket触发的。那么已经存储的数据被真正的处理又是被什么触发的呢?

记得在初始化StreamingContext的时候,我们指定了一个时间参数,那么用这个参数会构造相应的重复定时器,一旦定时器超时,调用generateJobs函数。

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

事件处理函数

 /** Processes all events */                                                                                              private def processEvent(event: JobGeneratorEvent) {                                                                     logDebug("Got event " + event)                                                                                         event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) } } 

generteJobs

 private def generateJobs(time: Time) {                                                                                   SparkEnv.set(ssc.env)                                                                                                  Try(graph.generateJobs(time)) match {                                                                                  case Success(jobs) =>                                                                                                val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => val streamId = stream.id val receivedBlockInfo = stream.getReceivedBlockInfo(time) (streamId, receivedBlockInfo) }.toMap jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) } 

 generateJobs->generateJob一路下去会调用到Job.run,在job.run中调用sc.runJob,在具体调用路径就不一一列出。

 private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) job.run() eventActor ! JobCompleted(job) } } 

DStream.generateJob函数中定义了jobFunc,也就是在job.run()中使用到的jobFunc

  private[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match {case Some(rdd) => {val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } 

在这个流程中,DStreamGraph起到非常关键的作用,非常类似于TridentStorm中的graph.

在generateJob过程中,DStream会通过调用compute函数生成相应的RDD,SparkContext则是将基于RDD的抽象转换成为多个stage,而执行。

StreamingContext中一个重要的转换就是DStream到RDD的转换,而SparkContext中一个重要的转换是RDD到Stage及Task的转换。在这两个不同的抽象类中,要注意其中getOrCompute和compute函数的实现。

小结

本篇内容有点仓促,内容不够丰富翔实,争取回头有空的时候再好好丰富一下具体的调用路径。

对于容错处理机制,本文没有涉及,待研究明白之后另起一篇进行阐述。

转载于:https://www.cnblogs.com/downtjs/p/3815291.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/264699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[书目20071127]图书 时间陷阱 目录

[内容简介] 在这个快速多变和工作至上主义盛行的时代, 工作常常让我们迷失了自己的世界, 本书令人信服地告诉你,在销售领域不必玩命工作依然能取得巨大成功。 托德邓肯直击浪费我们时间、降低我们工作效能的“陷阱 ”要害, 同时&a…

antd-react-mobile(踩坑记录)

1.按照官网步骤进行, $ npm install -g create-react-app # 注意:工具会自动初始化一个脚手架并安装 React 项目的各种必要依赖,如果在过程中出现网络问题,请尝试配置代理或使用 其他 npm registry。 $ create-react-app my-app …

启动Eclipse 弹出“Failed to load the JNI shared library”错误的解决方法

原因1:eclipse的版本与jre或者jdk版本不一致 对策:要么两者都安装64位的,要么都安两个是32位一个是64位。 原因2:给定目录下jvm.dll不存在 对策:(1)重新安装jre或者jdk并配置好环境变量。&#…

路径的应用(3)

如果与MC2同级的还有一个影片剪辑MC3,则想修改MC3,的透明度,用如下方法:on(release){_parent.MC3._alpha0;}如果MC3下还有一个影片剪辑MC4,想修改MC4的透明度则用如下方法:on(release){_parent.MC3.MC4._alpha0;}转载于:https://w…

php.ini在哪里 微赞_虚拟主机php.ini在哪?怎么查看php.ini在哪

虚拟主机php.ini在哪?怎么查看php.ini在哪?相信很多人都会有这样的疑问,下面聚名网为你详解一下以上问题。虚拟主机php.ini在哪?虚拟主机php.ini文件一般放置在文件管理器的“others”文件夹中。php.ini文件控制了PHP很多方面的性…

微软私有云分享(R2)22 计算机配置文件与基础设置

计算机配置文件是完全为了裸金属安装准备的。所以如果不准备使用裸金属安装,硬件配置文件在SCVMM2012 R2中也可以不用配置。本章操作完全用图来表示(其实我准备文字了,但是貌似文字丢了…………),创建该文件没什么注意…

总结了下自己的几个典型行为

所有电器都不看说明书;尽量使用自助办理业务;使用最多的称呼是同学;喜欢玩小孩但不喜欢生小孩;拥有一种奇怪的固执;熟人面前是话癖,生人面前一言不发;认幽默感是做人的根本;认为如果…

14.PTD与的基址

0xC0300000就是页目录的基址。 随便找一个软件测试下 通过0xC0300000找到的物理页就是页目录表这个物理页即是页目录表本身也是页表页目录表是一张特殊的页表,每一项PTE指向的不是普通的物理页,而是指向其他的页表.如果我们要访问第N个PDE, 那么有如下公式:0xc03000…

多项式回归

在上一节所介绍的非线性回归分析,首先要求我们对回归方程的函数模型做出推断。尽管在一些特定的情况下我们能够比較easy地做到这一点,可是在很多实际问题上经常会令我们不知所措。依据高等数学知识我们知道,不论什么曲线能够近似地用多项式表示&#xff…

Spring - shortcuts

How to use CSS, JavaScript and Images in Spring MVC Web App https://www.udemy.com/spring-hibernate-tutorial/learn/v4/t/lecture/5608584?start0 Deploying To Tomcat using WAR files https://www.udemy.com/spring-hibernate-tutorial/learn/v4/t/lecture/5633776?s…

.net中前台javascript与后台c#函数相互调用

1.如何在JavaScript访问C#函数? 2.如何在JavaScript访问C#变量? 3.如何在C#中访问JavaScript的已有变量? 4.如何在C#中访问JavaScript函数? 问题1答案如下: javaScript函数中执行C#代码中的函数: 方法一:1、首先建立一个按钮,…

二级c语言无纸化三合一_学习攻略|计算机二级考试重点及注意事项

2020年全国计算机等级考试将于9月26日强势来袭,亲爱的你们准备好了吗?下面为大家准备了一些干货以及考试注意事项,快拿出小本本记下来吧。考试题型二级office考试题型1、选择题 20分(含公共基础知识部分10分)2、操作题 80分 (1)Word文档 30…

mongodb集群与分片的配置说明

mongodb集群与分片的配置说明 Shardingcluster介绍: 这是一种可以水平扩展的模式,在数据量很大时特给力,实际大规模应用一般会采用这种架构去构建monodb系统。 系统分为需要三种角色: Shard Server:mongod 实例,用于存…

使用VS2005进行负载测试

下面通过一个简单的例子来讲解VS2005是如何做负载测试的.1、 编写一个加法程序,其中编写一个加法方法,然后调用该方法。//程序源代码using System;using System.Data;using System.Configuration;using System.Web;using System.Web.Security;using Syst…

vue项目实现列表页-详情页返回不刷新,再点其他菜单项返回刷新的需求

问题背景:有时候一些列表会有一些跳转的需求,比如跳到详情页、或者是其他相关的页面(比如跳到用户列表去查看用户的相关信息)等,此时再返回列表页,列表页会刷新重置。目前需求就是需要改成如下情况&#xf…

轮廓检测_轮廓检测| Richer Convolutional Features | CVPR | 2017

0 概述论文名称:“Richer Convolutional Features for Edge Detection”论文链接:https://openaccess.thecvf.com/content_cvpr_2017/papers/Liu_Richer_Convolutional_Features_CVPR_2017_paper.pdf缩写:RCF这一篇文论在我看来,是…

NodeJS开发环境配置

为什么80%的码农都做不了架构师?>>> 上链接~ http://www.cnblogs.com/Irving/p/3634232.html 转载于:https://my.oschina.net/weiyi/blog/287177

python各种类型转换-int,str,char,float,ord,hex,oct等

int(x [,base ]) 将x转换为一个整数 long(x [,base ]) 将x转换为一个长整数 float(x ) 将x转换到一个浮点数 complex(real [,imag ]) 创建一个复数 str(x ) 将对象 x 转换为字符串 repr(x ) 将对象 x 转换为表达式字符串 eval(str ) 用来计算在字符串中的有效Python表达式,并返…

一个平行四边形可以分成四个_将平行四边形分割成两个三角形还易变形么?(人教四下五单元三角形例2)...

最近,我学了三角形一课,研究了三角形的特性。课上我们拿出准备好的拼搭的三角形和平行四边形,动手拉一拉,结果发现三角形拉不动、平行四边形一拉就变形了。原来动动手也是研究数学呀,太有意思了!这时&#…

Java中利用MessageFormat对象实现类似C# string.Format方法格式化

我们在写C#代码的时候常常会使用到string.Format("待格式化字符串{0},{1},....",参数1,参数2,...),来格式化字符串,特别是拼接字符的时候,这种方式使得代码更为直观清楚。 最近使用java时候却java的string.Format与c#重点string.Fo…