温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Yarn中如何实现ScheduleBackend

发布时间:2021-06-26 14:07:41 来源:亿速云 阅读:362 作者:Leah 栏目:云计算

这篇文章将为大家详细讲解有关Yarn中如何实现ScheduleBackend,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

Yarn方式下的ScheduleBackend是用的啥?

在SparkContext中创建ScheduleBackend时,会根据指定的”master“参数的前缀决定创建哪种ScheduleBackend,对于"yarn://host:port"这样的URL来说,如果是cluster模式,就是创建YarnClusterSchedulerBackend,如果是client模式,就是创建YarnClientSchedulerBackend。

我们还是先看看YarnClusterSchedulerBackend的代码结构把。

YarnClusterSchedulerBackend继承了YarnSchedulerBackend,没有太多的发挥代码,我们直接看YarnSchedulerBackend把。估计client模式下也差不多。

YarnSchedulerBackend又继承了CoarseGrainedSchedulerBackend,我们看看不同点在哪里。

覆写了doRequestTotalExecutors和doKillExecutors方法,一个申请Executor,一个杀死Executor。

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {     yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))   }     override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {     yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))   }

yarnSchedulerEndpointRef就是同一个文件里的endpoint端,看看具体的执行代码是什么:

      case r: RequestExecutors =>         amEndpoint match {           case Some(am) =>             am.ask[Boolean](r).andThen {               case Success(b) => context.reply(b)               case Failure(NonFatal(e)) =>                 logError(s"Sending $r to AM was unsuccessful", e)                 context.sendFailure(e)             }(ThreadUtils.sameThread)                  }       case k: KillExecutors =>         amEndpoint match {           case Some(am) =>             am.ask[Boolean](k).andThen {               case Success(b) => context.reply(b)               case Failure(NonFatal(e)) =>                 logError(s"Sending $k to AM was unsuccessful", e)                 context.sendFailure(e)             }(ThreadUtils.sameThread)                   }

我们看到它又将消息转给了amEndpoint,就是转给了yarn工程里的ApplicationManager。又要跳到ApplicationManager去看看里面的实现逻辑了,真是一波三折啊。

ApplicationManager里是怎么处理RequestExecutors和KillExecutors两个消息的呢?

      case r: RequestExecutors =>         Option(allocator) match {           case Some(a) =>             if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,               r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {               resetAllocatorInterval()             }             context.reply(true)         }       case KillExecutors(executorIds) =>         Option(allocator) match {           case Some(a) => executorIds.foreach(a.killExecutor)         }         context.reply(true)

调用allocator的killExecutor和requestTotalExecutorsWithPreferredLocalities方法。allocator又是啥?这里是不是类有的太多了啊。。

allocator = client.createAllocator(       yarnConf,       _sparkConf,       appAttemptId,       driverUrl,       driverRef,       securityMgr,       localResources)

是client的createAllocator方法创建出来的,client是啥?是YarnRMClient,我们就要先看看YarnRMClient了,看名字就大概能猜到,YarnRMClient就是来向Yarn机器申请Executor和杀死Executor的。

createAllocator方法返回下面的YarnAllocator:

 return new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr,
      localResources, SparkRackResolver.get(conf))

来到YarnAllocator。

YarnAllocator的killExecutor方法很好理解,就是释放Yarn中的Container:

 def killExecutor(executorId: String): Unit = synchronized {     executorIdToContainer.get(executorId) match {       case Some(container) if !releasedContainers.contains(container.getId) =>         internalReleaseContainer(container)         runningExecutors.remove(executorId)       case _ => logWarning(s"Attempted to kill unknown executor $executorId!")     }   }

申请Executor其实最终是在runAllocatedContainers方法中实现的。

核心代码看一下把,完整的可以看源码:

    if (runningExecutors.size() < targetNumExecutors) {         numExecutorsStarting.incrementAndGet()         if (launchContainers) {           launcherPool.execute(() => {             try {               new ExecutorRunnable(                 Some(container),                 conf,                 sparkConf,                 driverUrl,                 executorId,                 executorHostname,                 executorMemory,                 executorCores,                 appAttemptId.getApplicationId.toString,                 securityMgr,                 localResources               ).run()               updateInternalState()             } catch {                           }           })         }

申请targetNumExecutors个ExecutorRunner,这样就和Standalone的申请Executor对应起来了。好了,整个过程就是这样了。

最终就会在Yarn集群中申请了所需数目的Container,并且在Container中启动ExecutorRunner,来向Driver汇报成绩。

这里的ExecutorRunner就是YarnCoarseGrainedExecutorBackend线程,在ExecutorRunner类中可以看到。

关于Yarn中如何实现ScheduleBackend就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI