def persist(newLevel: StorageLevel): this.type = { // StorageLevel不能随意更改 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup // 注册清理方法 sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
private[spark] def persistRDD(rdd: RDD[_]) { persistentRdds( = rdd }
它居然是用一个HashMap来存的,具体看这个地图的类型是TimeStampedWeakValueHashMap [Int,RDD [_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。
1、 CacheManager
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(, split.index) blockManager.get(key) match { case Some(values) => // 已经有了,直接返回就可以了 new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])? case None => // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了 loading.synchronized { if (loading.contains(key)) { while (loading.contains(key)) { try { loading.wait() } catch { case e: Exception => logWarning(s"Got an exception while waiting for another thread to load $key", e) } } // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次 blockManager.get(key) match { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => loading.add(key) } } else { loading.add(key) } } try { // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了 val computedValues = rdd.computeOrReadCheckpoint(split, context)? // 如果是本地运行的,就没必要缓存了,直接返回即可 if (context.runningLocally) { return computedValues }? // 跟踪blocks的更新状态 var updatedBlocks = Seq[(BlockId, BlockStatus)]() val returnValue: Iterator[T] = { if (storageLevel.useDisk && !storageLevel.useMemory) { /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager * 然后把结果直接返回,它不需要把结果一下子全部加载进内存 * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */ updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true) blockManager.get(key) match { case Some(values) => values.asInstanceOf[Iterator[T]] case None => throw new Exception("Block manager failed to return persisted valued") } } else { // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份 val elements = new ArrayBuffer[Any] elements ++= computedValues updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true) elements.iterator.asInstanceOf[Iterator[T]] } }? // 更新task的监控参数 val metrics = context.taskMetrics metrics.updatedBlocks = Some(updatedBlocks)? new InterruptibleIterator(context, returnValue)? } finally { // 改完了,释放锁 loading.synchronized { loading.remove(key) loading.notifyAll() } } } }
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)? val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false)
2、 BlockManager
private def doPut( blockId: BlockId, data: Values, level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]? // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。 // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容 val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // 如果不存在,就添加到blockInfo里面 val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) if (oldBlockOpt.isDefined) { // 如果已经存在了,就不需要重复添加了 if (oldBlockOpt.get.waitForReady()) {return updatedBlocks } // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍 oldBlockOpt.get } else { tinfo } }? val startTimeMs = System.currentTimeMillis // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator, // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的 // 时候,我们就必须依赖返回一个Iterator var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put var bytesAfterPut: ByteBuffer = null // Size of the block in bytes var size = 0L? // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的 val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { // duplicate并不是复制这些数据,只是做了一个包装 val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate() Future { // 把block复制到别的机器上去 replicate(blockId, bufferView, level) } } else { null }? putBlockInfo.synchronized {? var marked = false try { if (level.useMemory) { // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘 // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法 val res = data match { case IteratorValues(iterator) => memoryStore.putValues(blockId, iterator, level, true) case ArrayBufferValues(array) => memoryStore.putValues(blockId, array, level, true) case ByteBufferValues(bytes) => bytes.rewind() memoryStore.putBytes(blockId, bytes, level) } size = res.size // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer match { case Right(newBytes) => bytesAfterPut = newBytes