Apache Spark源码走读之4 -- DStream实时流数据处理

欢迎转载,转载请注明出处,徽沪一郎。

Spark Streaming能够对流数据进行近乎实时的速度进行数据处理。采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力。

本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤。

系统概述

流数据的特点

与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点

  1. 数据一直处在变化中
  2. 数据无法回退
  3. 数据一直源源不断的涌进

DStream

如果要用一句话来概括Spark Streaming的处理思路的话,那就是"将连续的数据持久化,离散化,然后进行批量处理"。

让我们来仔细分析一下这么作的原因。

  • 数据持久化 将从网络上接收到的数据先暂时存储下来,为事件处理出错时的事件重演提供可能,
  • 离散化 数据源源不断的涌进,永远没有一个尽头,就像周星驰的喜剧中所说“崇拜之情如黄河之水绵绵不绝,一发而不可收拾”。既然不能穷尽,那么就将其按时间分片。比如采用一分钟为时间间隔,那么在连续的一分钟内收集到的数据集中存储在一起。
  • 批量处理 将持久化下来的数据分批进行处理,处理机制套用之前的RDD模式

DStream可以说是对RDD的又一层封装。如果打开DStream.scala和RDD.scala,可以发现几乎RDD上的所有operation在DStream中都有相应的定义。

作用于DStream上的operation分成两类

  1. Transformation
  2. Output 表示将输出结果,目前支持的有print, saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles

DStreamGraph

有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false 

outputStreams中的元素是在有Output类型的Operation作用于DStream上时自动添加到DStreamGraph中的。

outputStream区别于inputStream一个重要的地方就是会重载generateJob.

初始化流程

StreamingContext

StreamingContext是Spark Streaming初始化的入口点,主要的功能是根据入参来生成JobScheduler

设定InputStream

如果流数据源来自于socket,则使用socketStream。如果数据源来自于不断变化着的文件,则可使用fileStream

提交运行

StreamingContext.start()

 

数据处理

以socketStream为例,数据来自于socket。

SocketInputDstream启动一个线程,该线程使用receive函数来接收数据

 def receive() {                                                                                                          var socket: Socket = null                                                                                              try {                                                                                                                  logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } } 

接收到的数据会被先存储起来,存储最终会调用到BlockManager.scala中的函数,那么BlockManager是如何被传递到StreamingContext的呢?利用SparkEnv传入的,注意StreamingContext构造函数的入参。

处理定时器

数据的存储有是被socket触发的。那么已经存储的数据被真正的处理又是被什么触发的呢?

记得在初始化StreamingContext的时候,我们指定了一个时间参数,那么用这个参数会构造相应的重复定时器,一旦定时器超时,调用generateJobs函数。

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

事件处理函数

 /** Processes all events */                                                                                              private def processEvent(event: JobGeneratorEvent) {                                                                     logDebug("Got event " + event)                                                                                         event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) } } 

generteJobs

 private def generateJobs(time: Time) {                                                                                   SparkEnv.set(ssc.env)                                                                                                  Try(graph.generateJobs(time)) match {                                                                                  case Success(jobs) =>                                                                                                val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => val streamId = stream.id val receivedBlockInfo = stream.getReceivedBlockInfo(time) (streamId, receivedBlockInfo) }.toMap jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) } 

 generateJobs->generateJob一路下去会调用到Job.run,在job.run中调用sc.runJob,在具体调用路径就不一一列出。

 private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) job.run() eventActor ! JobCompleted(job) } } 

DStream.generateJob函数中定义了jobFunc,也就是在job.run()中使用到的jobFunc

  private[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match {case Some(rdd) => {val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } 

在这个流程中,DStreamGraph起到非常关键的作用,非常类似于TridentStorm中的graph.

在generateJob过程中,DStream会通过调用compute函数生成相应的RDD,SparkContext则是将基于RDD的抽象转换成为多个stage,而执行。

StreamingContext中一个重要的转换就是DStream到RDD的转换,而SparkContext中一个重要的转换是RDD到Stage及Task的转换。在这两个不同的抽象类中,要注意其中getOrCompute和compute函数的实现。

小结

本篇内容有点仓促,内容不够丰富翔实,争取回头有空的时候再好好丰富一下具体的调用路径。

对于容错处理机制,本文没有涉及,待研究明白之后另起一篇进行阐述。

转载于:https://www.cnblogs.com/downtjs/p/3815291.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/264699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

antd-react-mobile(踩坑记录)

1.按照官网步骤进行, $ npm install -g create-react-app # 注意:工具会自动初始化一个脚手架并安装 React 项目的各种必要依赖,如果在过程中出现网络问题,请尝试配置代理或使用 其他 npm registry。 $ create-react-app my-app …

微软私有云分享(R2)22 计算机配置文件与基础设置

计算机配置文件是完全为了裸金属安装准备的。所以如果不准备使用裸金属安装,硬件配置文件在SCVMM2012 R2中也可以不用配置。本章操作完全用图来表示(其实我准备文字了,但是貌似文字丢了…………),创建该文件没什么注意…

14.PTD与的基址

0xC0300000就是页目录的基址。 随便找一个软件测试下 通过0xC0300000找到的物理页就是页目录表这个物理页即是页目录表本身也是页表页目录表是一张特殊的页表,每一项PTE指向的不是普通的物理页,而是指向其他的页表.如果我们要访问第N个PDE, 那么有如下公式:0xc03000…

多项式回归

在上一节所介绍的非线性回归分析,首先要求我们对回归方程的函数模型做出推断。尽管在一些特定的情况下我们能够比較easy地做到这一点,可是在很多实际问题上经常会令我们不知所措。依据高等数学知识我们知道,不论什么曲线能够近似地用多项式表示&#xff…

二级c语言无纸化三合一_学习攻略|计算机二级考试重点及注意事项

2020年全国计算机等级考试将于9月26日强势来袭,亲爱的你们准备好了吗?下面为大家准备了一些干货以及考试注意事项,快拿出小本本记下来吧。考试题型二级office考试题型1、选择题 20分(含公共基础知识部分10分)2、操作题 80分 (1)Word文档 30…

mongodb集群与分片的配置说明

mongodb集群与分片的配置说明 Shardingcluster介绍: 这是一种可以水平扩展的模式,在数据量很大时特给力,实际大规模应用一般会采用这种架构去构建monodb系统。 系统分为需要三种角色: Shard Server:mongod 实例,用于存…

使用VS2005进行负载测试

下面通过一个简单的例子来讲解VS2005是如何做负载测试的.1、 编写一个加法程序,其中编写一个加法方法,然后调用该方法。//程序源代码using System;using System.Data;using System.Configuration;using System.Web;using System.Web.Security;using Syst…

NodeJS开发环境配置

为什么80%的码农都做不了架构师?>>> 上链接~ http://www.cnblogs.com/Irving/p/3634232.html 转载于:https://my.oschina.net/weiyi/blog/287177

一个平行四边形可以分成四个_将平行四边形分割成两个三角形还易变形么?(人教四下五单元三角形例2)...

最近,我学了三角形一课,研究了三角形的特性。课上我们拿出准备好的拼搭的三角形和平行四边形,动手拉一拉,结果发现三角形拉不动、平行四边形一拉就变形了。原来动动手也是研究数学呀,太有意思了!这时&#…

配置redis三主三从

主从环境 centos7.6 redis4.0.1 主从192.168.181.139:6379192.168.181.136:6379192.168.181.136:6380192.168.181.137:6380192.168.181.137:6381192.168.181.139:6381集群实例配置 这里展示192.168.181.139:6379节点的,其他配置修改ip、端口号和文件名 bind 192.168…

tensorflow 模型小型化_模型小型化

实习终于结束了,现把实习期间做的基于人体姿态估计的模型小型化的工作做个总结。现在深度学习模型开始走向应用,因此我们需要把深度学习网络和模型部署到一些硬件上,而现有一些模型的参数量由于过大,会导致在一些硬件上的运行速度…

NoSQL系列:选择合适的数据库

NoSQL系列:选择合适的数据库 为什么使用NoSQL数据库? 阻抗失衡 关系模型和内存中的数据结构不匹配 采用更为方便的数据交互方式提升开发效率 待处理的数据量很大 数据量超过关系型数据库的承载能力 大集群的出现 在成本方面,集群中应用关系数…

史上最全搞怪WC标志(组图)--设计者太有才了。

转载于:https://www.cnblogs.com/onlyzq/archive/2007/12/22/1010318.html

下载安装webstrom及激活

太久没在新电脑上安装websrtom,又有点忘了咋激活。 一、安装 1.直接在浏览器搜索webstrom,打开官网,直接点击download。如下图 2.打开安装包,开始安装,直接点击 next 3.选择安装路径(安装在你想安装的位置&…

CentOS命令行安装配置KVM详细教程

系统为centos 6.4 64位 最小化安装 服务器为dell R720,所以网卡为em0,在vmware作为练习安装学习,同样可以,命令行不是为了显的牛B,而是不用考虑桌面兼容性和其他原因的影响一、查看是否支持虚拟化vmware开启方式虚拟化…

C#中如何得到Graphics对象

2019独角兽企业重金招聘Python工程师标准>>> 利用Graphics对象,我们可以绘制理想的UI。这里首先介绍C#中如何得到Graphics对象。 /如何得到Graphics对象 1. Control.CreateGraphics();直接通过Control类的公开方法获取。可以是Form,基础控件&…

mysql 5.5 创建用户_MySQL5.5以上版本添加用户

MySQL数据库在5.5以后的版本对添加用户的操作进行了改版,已经不能使用原有的添加语法操作。MySQL数据库5.6版本变更:在数据库用户这一块,为了数据安全,5.6版本不在允许root用户通过insert语句对user表进行添加用户如图报错语法貌似…

linux和windows双系统互拷文件乱码问题

2019独角兽企业重金招聘Python工程师标准>>> 如果你需要在linux下面用到windows下的文件,拷贝上去后经常发现中文显示乱码。。原因是Windows中默认的文件格式是 GBK(gb2312),而Linux一般都是UTF-8。比较繁琐的方法是在windows下用程序把内容转…

LOJ2195 旅行

LOJ2195 旅行 题目描述S 国有 N 个城市,编号从 1 到 N。城市间用 N-1 条双向道路连接,满足从一个城市出发可以到达其它所有城市。每个城市信仰不同的宗教,如飞天面条神教、隐形独角兽教、绝地教都是常见的信仰。为了方便,我们用不…

System variables, logging and the Execute SQL Task...(zz)

原文地址http://sqljunkies.com/WebLog/knight_reign/archive/2005/02/27/8187.aspxHeres something useful you can do with system variables and the SQL Task. Logging in SSIS is more flexible and there are more options for logging destinations and formats then wi…