spark 笔记 16: BlockManager

spark 笔记 16: BlockManager
先看一下原理性的文章:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/ ,http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/  , 另外,spark的存储使用了Segment File的概念(http://en.wikipedia.org/wiki/Segmented_file_transfer ),概括的说,它是把文件划分成多个段,分别存储在不同的服务器上;在读取的时候,同时从这些服务器上读取。(这也是BT的基础)。
之前分析shuffle的调用关系的时候,其实已经包含了很多的BlockManager的流程,但还是有必要系统的看一遍它的代码。
getLocalFromDisk这个函数,是前面看shuffleManager的终点,但却是BlockManager的起点。即使是到远端获取block的操作,也是发送一个消息到远端服务器上执行getLocalFromDisk,然后再把结果发送回来。
->diskStore.getValues(blockId, serializer)

============================BlockManager============================
-> BlockManager::getLocalFromDisk
->diskStore.getValues(blockId, serializer)
->getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
->val segment = diskManager.getBlockLocation(blockId) --DiskBlockManager的方法,获取block在一个文件中的一个块位置
->if  blockId.isShuffle and env.shuffleManager.isInstanceOf[SortShuffleManager] --如果是hash类型shuffle,
->sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) --For sort-based shuffle, let it figure out its blocks
->else if blockId.isShuffle and shuffleBlockManager.consolidateShuffleFiles --联合文件模式
->shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) --For hash-based shuffle with consolidated files
->val shuffleState = shuffleStates(id.shuffleId) --
->for (fileGroup <- shuffleState.allFileGroups)
->val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) --次函数单独分析
->if (segment.isDefined) { return segment.get }
->else
->val file = getFile(blockId.name)--getFile(filename: String): File
->val hash = Utils.nonNegativeHash(filename)
->val dirId = hash % localDirs.length
->val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
->var subDir = subDirs(dirId)(subDirId)
->new File(subDir, filename)
->new FileSegment(file, 0, file.length())
->val channel = new RandomAccessFile(segment.file, "r").getChannel
->if (segment.length < minMemoryMapBytes)
->channel.position(segment.offset)
->channel.read(buf)
->return buf
->else
->return Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))

ShuffleFileGroup:如何通过mapId和reduceId在ShuffleBlockManager 中获取数据:getFileSegmentFor函数
->根据reduceId从ShuffleFileGroup的属性val files: Array[File]里面找到reduce的文件句柄fd
    ->根据mapId从mapIdToIndex找到index,
        ->根据reduce找到blockOffset向量和blockLen向量,
            ->再通过index从向量里面找到offset和len,
                ->最后通过offset和len从fd里面读取到需要的数据

从远本地取数据
->BlockManager::doGetLocal
->val info = blockInfo.get(blockId).orNull
->val level = info.level
->if (level.useMemory) --Look for the block in memory
->val result = if (asBlockResult)
->memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
->esle
->memoryStore.getBytes(blockId)
->if (level.useOffHeap) -- Look for the block in Tachyon
->tachyonStore.getBytes(blockId)
->if (level.useDisk)
->val bytes: ByteBuffer = diskStore.getBytes(blockId)
->if (!level.useMemory) // If the block shouldn't be stored in memory, we can just return it
->if (asBlockResult)
->return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size))
->else
->return Some(bytes)
->else --memory// Otherwise, we also have to store something in the memory store
->if (!level.deserialized || !asBlockResult) 不序列化或者不block"memory serialized", or if it should be cached as objects in memory
->val copyForMemory = ByteBuffer.allocate(bytes.limit)
->copyForMemory.put(bytes)
->memoryStore.putBytes(blockId, copyForMemory, level)
->if (!asBlockResult)
->return Some(bytes)
->else --需要序列化再写内存
->val values = dataDeserialize(blockId, bytes)
->if (level.deserialized) // Cache the values before returning them
->val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false)
->putResult.data match case Left(it) return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
->else
->return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
->val values = dataDeserialize(blockId, bytes)
从远端获取数据
->BlockManager::doGetRemote
->val locations = Random.shuffle(master.getLocations(blockId)) --随机打散
->for (loc <- locations) --遍历所有地址
->val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
->val blockMessage = BlockMessage.fromGetBlock(msg)
->val newBlockMessage = new BlockMessage()
->newBlockMessage.set(getBlock)
->typ = BlockMessage.TYPE_GET_BLOCK
->id = getBlock.id
->val blockMessageArray = new BlockMessageArray(blockMessage)
-> val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
->responseMessage match {case Success(message) =>  val bufferMessage = message.asInstanceOf[BufferMessage]
->logDebug("Response message received " + bufferMessage)
->BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => 
->logDebug("Found " + blockMessage)
->return blockMessage.getData
->return Some(data)

===========================end=================================
再次引用这个图:多个map可以对应一个文件,其中每个map对应文件中的某些段。这样做是为了减少文件数量。
spark shuffle  consolidation process
(图片来源:http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/ )
获取block数据返回的数据结构
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
}

private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager)
extends BlockDataProvider with Logging {
shuffle状态,主要包含了unusedFileGroups、allFileGroups两个属性,记录当前已经使用和未使用的ShuffleFileGroup
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()

/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
shuffleStates 是一个基于时间戳的hash table 
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]

private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
Used by sort-based shuffle: shuffle结束时将结果注册到shuffleStates
/**
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
* because it just writes a single file by itself.
*/
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
val shuffleState = shuffleStates(shuffleId)
shuffleState.completedMapTasks.add(mapId)
}
将自己注册给master 
/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
}
从本地磁盘获取一个block数据。为了方便使用
/**
* A short-circuited method to get blocks directly from disk. This is used for getting
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
}

ShuffleWriterGroup:每个shuffleMapTask都有一组shuffleWriter,它给每个reducer分配了一个writer。当前只有HashShufflle使用了,唯一一个实例化是在forMapTask返回的,给HashShuffleWriter的shuffle属性使用:
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]

/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
}

/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
* per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
* Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
* that specifies where in a given file the actual block data is located.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation
private[spark]
class ShuffleBlockManager(blockManager: BlockManager,
shuffleManager: ShuffleManager) extends Logging {
ShuffleFileGroup是一组文件,每个reducer对应一个。每个map将会对应一个这个文件(但多个map可以对应一个文件)。多个map对应一个文件时,它们写入是分段写入的(mapId,ReduceId)通过getFileSegmentFor函数获取到这个块的内容
privateobject /**
* .
* .
*/
private class val Int, val Int, val private var numBlocksInt 0

/**
* For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex new Int, Int/**
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: * .
*/
private val blockOffsetsByReducer fillLongnew Longprivate val blockLengthsByReducer fillLongnew Longdef applyIntdef recordMapOutputInt, Long, LongassertmapIdToIndexnumBlocks
numBlocks 1
for 0 blockOffsetsByReducerblockLengthsByReducer/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentForInt, Intval val blockOffsetsByReducerval blockLengthsByReducerval mapIdToIndex, 1if 0val val Somenew , , else











来自为知笔记(Wiz)


posted on 2015-01-27 16:20 过雁 阅读(...) 评论(...) 编辑 收藏

转载于:https://www.cnblogs.com/zwCHAN/p/4253287.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/259435.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ stringstream介绍,使用方法与例子

C引入了ostringstream、istringstream、stringstream这三个类&#xff0c;要使用他们创建对象就必须包含sstream.h头文件。   istringstream类用于执行C风格的串流的输入操作。 ostringstream类用于执行C风格的串流的输出操作。 strstream类同时可以支持C风格的串流的输入…

CheckBox控件

前台代码&#xff1a; 1 <asp:CheckBox ID"CheckBox1" runat"server" Text "苹果"/> 2 <asp:CheckBox ID"CheckBox2" runat"server" Text "柠檬"/> 3 <asp:CheckBox ID"CheckBox3" runa…

go.js中的图标(icons)的使用

2019独角兽企业重金招聘Python工程师标准>>> 1、图标库下载&#xff1a; 将icons引入&#xff1a;http://gojs.net/latest/samples/icons.js 2、样式演示 地址&#xff1a;http://gojs.net/latest/samples/icons.html 转载于:https://my.oschina.net/u/2391658/blog…

Pygame - Python游戏编程入门(1)

前言 在上一篇中&#xff0c;我们初步熟悉了pygame的控制流程&#xff0c;但这对于一个游戏而言是远远不够的。所以在这一篇中&#xff0c;我们的任务是添加一架飞机&#xff08;玩家&#xff09;&#xff0c;并且能够控制它进行移动&#xff0c;这样我们就又离目标进了一步了~…

AQS浅析

2019独角兽企业重金招聘Python工程师标准>>> AQS的原理浅析 本文是《Java特种兵》的样章&#xff0c;本书即将由工业出版社出版 AQS的全称为&#xff08;AbstractQueuedSynchronizer&#xff09;&#xff0c;这个类也是在java.util.concurrent.locks下面。这个类似乎…

编程如写作

昨晚似乎是个适合写作的夜&#xff0c;不论是自己还是朋友&#xff0c;都比平常更容易被触动。看着微博上朋友们的心路&#xff0c;想写点什么却似乎找不出非常值得大书特书的主题&#xff0c;只是歪坐在电脑旁&#xff0c;喝着咖啡&#xff0c;单曲循环着仓木麻衣的《time aft…

工作环境总结(1)开发环境搭建

1、安装git 安装文件&#xff1a;Git-2.12.0-64-bit.exe 下载地址&#xff1a;https://github.com/git-for-windows/git/releases/download/v2.12.0.windows.1/Git-2.12.0-64-bit.exe 在git bash中配置&#xff0c;git bash命令行中执行&#xff08;只有使用到egit时使用&…

15款的视频处理软件免费下载

因为需要购买昂贵的视频处理软件和高性能图形计算机&#xff0c;所以视频处理是一项比较耗费金钱的技术活。正是由于这样&#xff0c;一部分人选择使用性能较好的免费在线编辑软件&#xff0c;无需太多视频处理知识便可在浏览器中剪切和编辑视频。然而&#xff0c;当我们无法连…

液位系统c语言程序,超声波自动测量物体液位系统的设计

超声波自动测量物体液位系统的设计(任务书,毕业论文15000字)摘要本系统以STC89C52单片机为核心&#xff0c;通过硬件电路连接和软件程序的编写实现通用型超声波自动测量物体液位系统的设计。其主要原理是由单片机控制超声波发射电路发射超声波&#xff0c;超声波接收电路接收遇…

android-sdk-windows版本号下载

Android SDK 4.0.3 开发环境配置及执行 近期又装了一次最新版本号的ADK环境 眼下最新版是Android SDK 4.0.3 本文的插图和文本尽管是Android2.2的 步骤都是一样的&#xff0c;假设安装的过程中遇到什么问题&#xff0c;能够留言&#xff0c;我会尽快回复&#xff01; 系统环境的…

emacs-w3m查看html帮助手册

<?xml version"1.0" encoding"utf-8"?> emacs-w3m查看html帮助手册emacs-w3m查看html帮助手册 Table of Contents 1. 使用效果2. 为什么要用emacs-w3m来查看html的帮助手册&#xff1f;3. 什么是w3m?4. 配置5. 额外资源1 使用效果 使用快捷键C-c …

工作中的问题

今天写一专题页面&#xff0c;写出的结果在各个浏览器下都不同&#xff0c;心情不好。。。 就是红线的地方老对不齐。。。 在朋友指导下改了下样式好了 右边代码结构 1 <div class"fr Img"> 2 <h3>相关专题</h3> 3 <a href"#"…

数据结构行编辑成簇 c语言,索引的数据结构及底层存储

索引是帮助数据库高效获取数据的数据结构索引的数据结构1.hash表a.利用hash存储的话需要将所有的数据文件添加到内存&#xff0c;比较耗费内存空间b.hash表存储的是无序数据&#xff0c;范围查找的时候需要挨个进行遍历&#xff0c;比较耗费时间。2.二叉树二叉树规定左子树必须…

The C Programming Language--可变参数的函数

函数 printf的正确声明形式为&#xff1a;int printf(char *fmt, ...) void va_start (va list ap, last-required) type va_arg (va list ap, type) void va_end (va list ap) 其中&#xff0c;省略号表示参数表中参数的数量和类型是可变的。 va_list 类型用于声明一个变量&am…

cifar10 c语言,Python3读取深度学习CIFAR-10数据集出现的若干问题解决

今天在看网上的视频学习深度学习的时候&#xff0c;用到了CIFAR-10数据集。当我兴高采烈的运行代码时&#xff0c;却发现了一些错误&#xff1a;# -*- coding: utf-8 -*-import pickle as pimport numpy as np import os def load_CIFAR_batch(filename): """ 载…

各种排序算法总结

转载&#xff1a;http://blog.csdn.net/warringah1/article/details/8951220 明天就要去参加阿里巴巴的实习生笔试了&#xff0c;虽然没想着能进去&#xff0c;但是态度还是要端正的&#xff0c;也没什么可以准备的&#xff0c;复习复习排序吧。 1 插入排序 void InsertSort(in…

CentOS7 上安装 Zookeeper-3.4.9 服务

在 CentOS7 上安装 zookeeper-3.4.9 服务1、创建 /usr/local/services/zookeeper 文件夹&#xff1a; mkdir -p /usr/local/services/zookeeper 2、进入到 /usr/local/services/zookeeper 目录中&#xff1a; cd /usr/local/services/zookeeper 3、下载 zookeeper-3.4.9.…

HTTP响应报文与工作原理详解

HTTP 是一种请求/响应式的协议&#xff0c;即一个客户端与服务器建立连接后&#xff0c;向服务器发送一个请求;服务器接到请求后&#xff0c;给予相应的响应信息。 超文本传输协议(Hypertext Transfer Protocol&#xff0c;简称HTTP)是应用层协议。HTTP 是一种请求/响应式的协议…

android自定义画板,android 自定义控件 -- 画板

如图&#xff1a;package com.example.myview;import android.content.Context;import android.graphics.Canvas;import android.graphics.Color;import android.graphics.Paint;import android.graphics.Path;import android.graphics.Paint.Style;import android.util.Attrib…

postgreSQl pathman 用法语句总结

2019独角兽企业重金招聘Python工程师标准>>> --新建主表 create table part_test(id int, info text, crt_time timestamp not null); --插入测试数据 insert into part_test select id,md5(random()::text),clock_timestamp() (id|| hour)::interval from generat…