第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底

上节课将到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。

一、 ReceiverTracker主要的功能:

在Executor上启动Receivers。

停止Receivers 。

更新Receiver接收数据的速率(也就是限流)

不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver。也就是Receiver的容错功能。

接受Receiver的注册。

借助ReceivedBlockTracker来管理Receiver接收数据的元数据。

汇报Receiver发送过来的错误信息


ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。

在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers:

/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized {   if (isTrackerStarted) {     throw new SparkException("ReceiverTracker already started")   }   if (!receiverInputStreams.isEmpty) {     endpoint = ssc.env.rpcEnv.setupEndpoint(       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))     if (!skipReceiverLaunch) launchReceivers()     logInfo("ReceiverTracker started")     trackerState = Started   } }

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。

endpoint.send(StartAllReceivers(receivers))

这里的endpoint就是ReceiverTrackerEndpoint的引用。


Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

override protected def onReceiverStart(): Boolean = {   val msg = RegisterReceiver(     streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)   trackerEndpoint.askWithRetry[Boolean](msg) }

当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker:

/** Store block and report it to driver */ def pushAndReportBlock(     receivedBlock: ReceivedBlock,     metadataOption: Option[Any],     blockIdOption: Option[StreamBlockId]   ) {   val blockId = blockIdOption.getOrElse(nextBlockId)   val time = System.currentTimeMillis   val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)   logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")   val numRecords = blockStoreResult.numRecords   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))   logDebug(s"Reported block $blockId") }


当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据:

case AddBlock(receivedBlockInfo) =>   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {     walBatchingThreadPool.execute(new Runnable {       override def run(): Unit = Utils.tryLogNonFatalError {         if (active) {           context.reply(addBlock(receivedBlockInfo))         } else {           throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")         }       }     })   } else {     context.reply(addBlock(receivedBlockInfo))   }

数据的元数据是交由ReceivedBlockTracker管理的。

数据最终被写入到streamIdToUnallocatedBlockQueues中:一个流对应一个数据块信息的队列。

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]


每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。

private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] .... def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {     val streamIdToBlocks = streamIds.map { streamId =>         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))     }.toMap     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)       lastAllocatedBatchTime = batchTime     } else {       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")     }   } else {     // This situation occurs when:     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,     // possibly processed batch job or half-processed batch job need to be processed again,     // so the batchTime will be equal to lastAllocatedBatchTime.     // 2. Slow checkpointing makes recovered batch time older than WAL recovered     // lastAllocatedBatchTime.     // This situation will only occurs in recovery time.     logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")   } }

可见一个batch会包含多个流的数据。


郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。