温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

发布时间:2020-06-17 22:03:04 来源:网络 阅读:1605 作者:Spark_2016 栏目:大数据

本期内容:
1. ReceiverBlockTracker容错安全性 
2. DStream和JobGenerator容错安全性


一:容错安全性 
1. ReceivedBlockTracker负责管理Spark Streaming运行程序的元数据。数据层面 
2. DStream和JobGenerator是作业调度的核心层面,也就是具体调度到什么程度了,从运行的考虑的。DStream是逻辑层面。 
3. 作业生存层面,JobGenerator是Job调度层面,具体调度到什么程度了。从运行的角度的。

谈Driver容错你要考虑Driver中有那些需要维持状态的运行。 
1. ReceivedBlockTracker跟踪了数据,因此需要容错。通过WAL方式容错。 
2. DStreamGraph表达了依赖关系,恢复状态的时候需要根据DStream恢复计算逻辑级别的依赖关系。通过checkpoint方式容错。 
3. JobGenerator表面你是怎么基于ReceiverBlockTracker中的数据,以及DStream构成的依赖关系不断的产生Job的过程。你消费了那些数据,进行到什么程度了。

总结如下:

(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

ReceivedBlockTracker: 
1. ReceivedBlockTracker会管理Spark Streaming运行过程中所有的数据。并且把数据分配给需要的batches,所有的动作都会被WAL写入到Log中,Driver失败的话,就可以根据历史恢复tracker状态,在ReceivedBlockTracker创建的时候,使用checkpoint保存历史目录。

下面就从Receiver收到数据之后,怎么处理的开始。 
2. ReceiverBlockTracker.addBlock源码如下: 
Receiver接收到数据,把元数据信息汇报上来,然后通过ReceiverSupervisorImpl就将数据汇报上来,就直接通过WAL进行容错. 
当Receiver的管理者,ReceiverSupervisorImpl把元数据信息汇报给Driver的时候,正在处理是交给ReceiverBlockTracker. ReceiverBlockTracker将数据写进WAL文件中,然后才会写进内存中,被当前的Spark Streaming程序的调度器使用的,也就是JobGenerator使用的。JobGenerator不可能直接使用WAL。WAL的数据在磁盘中,这里JobGenerator使用的内存中缓存的数据结构


/** Add received block. This event will get written to the write ahead log (if enabled). */ def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) //接收数据后,先进行WAL if (writeResult) {       synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //当WAL成功后,将Block Info元数据信息加入到Block Queue中       }       logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}")     } else {       logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")     }     writeResult   } catch { case NonFatal(e) =>       logError(s"Error adding block $receivedBlockInfo", e) false } }

Driver端接收到的数据保存在streamIdToUnallocatedBlockQueues中,具体结构如下:


private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
allocateBlocksToBatch把接收到的数据分配给batch,根据streamId取出Block,由此就知道Spark Streaming处理数据的时候可以有不同数据来源
那到底什么是batchTime? 
batchTime是上一个Job分配完数据之后,开始再接收到的数据的时间。
/**  * Allocate all unallocated blocks to the given batch.  * This event will get written to the write ahead log (if enabled).  */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId =>         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) //根据StreamId获取Block信息     }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime //这里有对batchTime进行赋值,就是上一个job分配完数据后,开始在接收到数据的时间     } else {       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")     }   } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")   } }

随着时间的推移,会不断产生RDD,这时就需要清理掉一些历史数据,可以通过cleanupOldBatches方法来清理历史数据


/**  * Clean up block information of old batches. If waitForCompletion is true, this method  * returns only after the files are cleaned up.  */ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq   logInfo("Deleting batches " + timesToCleanup) if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))   } else {     logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")   } }

以上几个方法都进行了WAL动作

(record: ReceivedBlockTrackerLogEvent): = { (isWriteAheadLogEnabled) {     logTrace(record) { .get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis()) } { (e) =>         logWarning(recorde) }   } { } }

总结: 
WAL对数据的管理包括数据的生成,数据的销毁和消费。上述在操作之后都要先写入到WAL的文件中. 


(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

JobGenerator: 
Checkpoint会有时间间隔Batch Duractions,Batch执行前和执行后都会进行checkpoint。 
doCheckpoint被调用的前后流程: 
(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

1、简单看下generateJobs

/** Generate jobs and perform checkpoint for the given `time`.  */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment   // Example: BlockRDDs are created in this thread, and it needs to access BlockManager   // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try {     jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)       jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) =>       jobScheduler.reportError("Error generating jobs for time " + time, e)   } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //job完成后就需要进行checkpoint动作 }

2、processEvent接收到消息事件


/** Processes all events */ private def processEvent(event: JobGeneratorEvent) {   logDebug("Got event " + event)   event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) // 调用doCheckpoint方法 case ClearCheckpointData(time) => clearCheckpointData(time)   } }

3、doCheckpoint源码如下:


/** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {     logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) //最终是进行RDD的Checkpoint checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)   } }

4、DStream中的updateCheckpointData源码如下:最终导致RDD的Checkpoint


/**  * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of  * this stream. This is an internal method that should not be called directly. This is  * a default implementation that saves only the file names of the checkpointed RDDs to  * checkpointData. Subclasses of DStream (especially those of InputDStream) may override  * this method to save custom checkpoint data.  */ private[streaming] def updateCheckpointData(currentTime: Time) {   logDebug("Updating checkpoint data for time " + currentTime) checkpointData.update(currentTime)   dependencies.foreach(_.updateCheckpointData(currentTime))   logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) }

JobGenerator容错安全性如下图: 
(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性



参考博客:http://blog.csdn.net/snail_gesture/article/details/51492873#comments

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI