YARN SLS-AM请求资源的协议解析

在slsrunner.java中:

startAM->startAMFromSLSTrace->createAMForJob->runNewAM

在runNewAM函数中,最后两个参数很重要:


  1. private void createAMForJob(Map jsonJob) throws YarnException {
  2. ......
  3. for (int i = 0; i < jobCount; i++) {
  4. runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
  5. getTaskContainers(jsonJob), getAMContainerResource(jsonJob));
  6. }
  7. }

最后两个参数分别是getTaskContainers和getAMContainerResource,前者是获取task所需的container的需求,后者是获得AM运行的container的需求。

先看后者:

getAMContainerResource,函数返回对象是Resource类


  1. private Resource getAMContainerResource(Map jsonJob) {
  2. Resource amContainerResource =
  3. SLSConfiguration.getAMContainerResource(getConf());
  4. if (jsonJob == null) {
  5. return amContainerResource;
  6. }
  7. ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
  8. for (ResourceInformation info : infors) {
  9. String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
  10. if (jsonJob.containsKey(key)) {
  11. long value = Long.parseLong(jsonJob.get(key).toString());
  12. amContainerResource.setResourceValue(info.getName(), value);
  13. }
  14. }
  15. return amContainerResource;
  16. }

Resource类在org.apache.hadoop.yarn.api.records.Resource.java中实现的。

是<mem, vcores>的形式,也就是单个container的resource量。

在函数的开始,通过SLSConfiguration.getAMContainerResource(getConf())获得SLSconfiguration对于AM的resource初始量:

getAMContainerResource.getAMContainerResource:


  1. public static Resource getAMContainerResource(Configuration conf) {
  2. return Resource.newInstance(
  3. conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT),
  4. conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT));
  5. }

可以看到AM的mem default是1024,vcores default是1


  1. public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
  2. public static final String AM_CONTAINER_VCORES = AM_PREFIX +
  3. "container.vcores";
  4. public static final int AM_CONTAINER_VCORES_DEFAULT = 1;

这就将AM的container的需求量得到了。

然后再看getTaskContainers:


  1. private List<ContainerSimulator> getTaskContainers(Map jsonJob)
  2. throws YarnException {
  3. List<ContainerSimulator> containers = new ArrayList<>();
  4. List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
  5. if (tasks == null || tasks.size() == 0) {
  6. throw new YarnException("No task for the job!");
  7. }
  8. for (Object o : tasks) {
  9. Map jsonTask = (Map) o;
  10. String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
  11. long duration = 0;
  12. if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
  13. duration = Integer.parseInt(
  14. jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
  15. } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
  16. // Also support "duration.ms" for backward compatibility
  17. duration = Integer.parseInt(
  18. jsonTask.get(SLSConfiguration.DURATION_MS).toString());
  19. } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
  20. jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
  21. long taskStart = Long.parseLong(
  22. jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
  23. long taskFinish = Long.parseLong(
  24. jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
  25. duration = taskFinish - taskStart;
  26. }
  27. if (duration <= 0) {
  28. throw new YarnException("Duration of a task shouldn't be less or equal"
  29. + " to 0!");
  30. }
  31. Resource res = getResourceForContainer(jsonTask);
  32. int priority = DEFAULT_MAPPER_PRIORITY;
  33. if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
  34. priority = Integer.parseInt(
  35. jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
  36. }
  37. String type = "map";
  38. if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
  39. type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
  40. }
  41. int count = 1;
  42. if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
  43. count = Integer.parseInt(
  44. jsonTask.get(SLSConfiguration.COUNT).toString());
  45. }
  46. count = Math.max(count, 1);
  47. ExecutionType executionType = ExecutionType.GUARANTEED;
  48. if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
  49. executionType = ExecutionType.valueOf(
  50. jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
  51. }
  52. for (int i = 0; i < count; i++) {
  53. containers.add(
  54. new ContainerSimulator(res, duration, hostname, priority, type,
  55. executionType));
  56. }
  57. }
  58. return containers;
  59. }

函数返回的对象是List<ContainerSimulator>

该类的实现在org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator:


  1. public class ContainerSimulator implements Delayed {
  2. // id
  3. private ContainerId id;
  4. // resource allocated
  5. private Resource resource;
  6. // end time
  7. private long endTime;
  8. // life time (ms)
  9. private long lifeTime;
  10. // host name
  11. private String hostname;
  12. // priority
  13. private int priority;
  14. // type
  15. private String type;
  16. // execution type
  17. private ExecutionType executionType = ExecutionType.GUARANTEED;

可以看到包括了Resource类,还有一些其他属性,比如id,endtime,hostname,priority等。

也就是说给task分配的container的属性,要比给AM分配的container属性要多。

回到getTaskContainers,对了循环了sls-jobs.json中每个task,然后获得task在-jobs.json中描述的属性,特别关注下resource的获得:

Resource res = getResourceForContainer(jsonTask);


  1. private Resource getResourceForContainer(Map jsonTask) {
  2. Resource res = getDefaultContainerResource();
  3. ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
  4. for (ResourceInformation info : infors) {
  5. if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
  6. long value = Long.parseLong(
  7. jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
  8. .toString());
  9. res.setResourceValue(info.getName(), value);
  10. }
  11. }
  12. return res;
  13. }

可以看到是通过getDefaultContainerResource获得的,看这个函数的实现:

getDefaultContainerResource:


  1. private Resource getDefaultContainerResource() {
  2. int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
  3. SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
  4. int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
  5. SLSConfiguration.CONTAINER_VCORES_DEFAULT);
  6. return Resources.createResource(containerMemory, containerVCores);
  7. }

可以看到不出所料,还是获得SLSConfiguration中的default mem:1024和default vcore:1

现在可以总结了:AM的container和task的container是分开分配的。AM的container分配的资源类型是resource:<mem, core>,而task的container分配的资源类型是ContainerSimulator,resource是其中的一个成员,还有其他的成员,比如hostname,id等。

再回到最初的runNewAM:

将AM获得的container资源和task获得的container资源作为参数传入后。


  1. private void runNewAM(String jobType, String user,
  2. String jobQueue, String oldJobId, long jobStartTimeMS,
  3. long jobFinishTimeMS, List<ContainerSimulator> containerList,
  4. Resource amContainerResource) {
  5. runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
  6. jobFinishTimeMS, containerList, null, -1,
  7. amContainerResource, null);
  8. }

  1. private void runNewAM(String jobType, String user,
  2. String jobQueue, String oldJobId, long jobStartTimeMS,
  3. long jobFinishTimeMS, List<ContainerSimulator> containerList,
  4. ReservationId reservationId, long deadline, Resource amContainerResource,
  5. Map<String, String> params) {
  6. AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
  7. amClassMap.get(jobType), new Configuration());
  8. if (amSim != null) {
  9. int heartbeatInterval = getConf().getInt(
  10. SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
  11. SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
  12. boolean isTracked = trackedApps.contains(oldJobId);
  13. if (oldJobId == null) {
  14. oldJobId = Integer.toString(AM_ID);
  15. }
  16. AM_ID++;
  17. amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
  18. jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
  19. runner.getStartTimeMS(), amContainerResource, params);
  20. if(reservationId != null) {
  21. // if we have a ReservationId, delegate reservation creation to
  22. // AMSim (reservation shape is impl specific)
  23. UTCClock clock = new UTCClock();
  24. amSim.initReservation(reservationId, deadline, clock.getTime());
  25. }
  26. runner.schedule(amSim);
  27. maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
  28. numTasks += containerList.size();
  29. amMap.put(oldJobId, amSim);
  30. }
  31. }

可以看到作为amSim.init进行初始化了。

amSim.init:


  1. public void init(int heartbeatInterval,
  2. List<ContainerSimulator> containerList, ResourceManager resourceManager,
  3. SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
  4. String simQueue, boolean tracked, String oldApp, long baseTimeMS,
  5. Resource amResource, Map<String, String> params) {
  6. super.init(startTime, startTime + 1000000L * heartbeatInterval,
  7. heartbeatInterval);
  8. this.user = simUser;
  9. this.rm = resourceManager;
  10. this.se = slsRunnner;
  11. this.queue = simQueue;
  12. this.oldAppId = oldApp;
  13. this.isTracked = tracked;
  14. this.baselineTimeMS = baseTimeMS;
  15. this.traceStartTimeMS = startTime;
  16. this.traceFinishTimeMS = finishTime;
  17. this.amContainerResource = amResource;
  18. }

可以看到,amResource赋给了this.amContainerResource。作为AM的一个成员了。

接下来看看this.amContainerResource在哪被调用了:


  1. private void submitApp(ReservationId reservationId)
  2. throws YarnException, InterruptedException, IOException {
  3. // ask for new application
  4. GetNewApplicationRequest newAppRequest =
  5. Records.newRecord(GetNewApplicationRequest.class);
  6. GetNewApplicationResponse newAppResponse =
  7. rm.getClientRMService().getNewApplication(newAppRequest);
  8. appId = newAppResponse.getApplicationId();
  9. // submit the application
  10. final SubmitApplicationRequest subAppRequest =
  11. Records.newRecord(SubmitApplicationRequest.class);
  12. ApplicationSubmissionContext appSubContext =
  13. Records.newRecord(ApplicationSubmissionContext.class);
  14. appSubContext.setApplicationId(appId);
  15. appSubContext.setMaxAppAttempts(1);
  16. appSubContext.setQueue(queue);
  17. appSubContext.setPriority(Priority.newInstance(0));
  18. ContainerLaunchContext conLauContext =
  19. Records.newRecord(ContainerLaunchContext.class);
  20. conLauContext.setApplicationACLs(new HashMap<>());
  21. conLauContext.setCommands(new ArrayList<>());
  22. conLauContext.setEnvironment(new HashMap<>());
  23. conLauContext.setLocalResources(new HashMap<>());
  24. conLauContext.setServiceData(new HashMap<>());
  25. appSubContext.setAMContainerSpec(conLauContext);
  26. appSubContext.setResource(amContainerResource);

可以看到,在submitApp中,通过appSubContext.setResource(amContainerResource),amContainerResource作为appSubContext的一部分。

关于appSubContext结构的分析,见另一篇博文:YARN-SLS AM,RM协议解析

ApplicationSubmissionContext的协议结构如下:


  1. message ApplicationSubmissionContextProto {
  2. optional ApplicationIdProto application_id = 1;
  3. optional string application_name = 2 [default = "N/A"];
  4. optional string queue = 3 [default = "default"];
  5. optional PriorityProto priority = 4;
  6. optional ContainerLaunchContextProto am_container_spec = 5;
  7. optional bool cancel_tokens_when_complete = 6 [default = true];
  8. optional bool unmanaged_am = 7 [default = false];
  9. optional int32 maxAppAttempts = 8 [default = 0];
  10. optional ResourceProto resource = 9;
  11. optional string applicationType = 10 [default = "YARN"];
  12. optional bool keep_containers_across_application_attempts = 11 [default = false];
  13. repeated string applicationTags = 12;
  14. optional int64 attempt_failures_validity_interval = 13 [default = -1];
  15. optional LogAggregationContextProto log_aggregation_context = 14;
  16. optional ReservationIdProto reservation_id = 15;
  17. optional string node_label_expression = 16;
  18. repeated ResourceRequestProto am_container_resource_request = 17;
  19. repeated ApplicationTimeoutMapProto application_timeouts = 18;
  20. repeated StringStringMapProto application_scheduling_properties = 19;
  21. }

其中:optional ResourceProto resource就是通过appSubContext.setResource(amContainerResource)赋值过去的。

然后将appSubContext作为subAppRequest的一部分,最后通过:

rm.getClientRMService().submitApplication(subAppRequest);传递给RM。

以上就是AM的container请求传递给RM的过程。

那么AM中task的container请求传递给RM是什么样的呢?

在runNewAM中,ask的container请求的形式是List<ContainerSimulator> containerList,传入到amSim.init,与上面相同,但奇怪的是在amSim.init中,并未将containerList参数传给任何对象。


  1. public void init(int heartbeatInterval,
  2. List<ContainerSimulator> containerList, ResourceManager resourceManager,
  3. SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
  4. String simQueue, boolean tracked, String oldApp, long baseTimeMS,
  5. Resource amResource, Map<String, String> params) {
  6. super.init(startTime, startTime + 1000000L * heartbeatInterval,
  7. heartbeatInterval);
  8. this.user = simUser;
  9. this.rm = resourceManager;
  10. this.se = slsRunnner;
  11. this.queue = simQueue;
  12. this.oldAppId = oldApp;
  13. this.isTracked = tracked;
  14. this.baselineTimeMS = baseTimeMS;
  15. this.traceStartTimeMS = startTime;
  16. this.traceFinishTimeMS = finishTime;
  17. this.amContainerResource = amResource;
  18. }

可以看到参数List<ContainerSimulator> containerList并没有传给其他对象。

在middlestep的sendContainerRequest中,调用了packageRequests方法来打包task的container request,应该是在某个地方进行了参数的传递。

转到sendContainerRequest:


  1. @Override
  2. protected void sendContainerRequest()
  3. throws YarnException, IOException, InterruptedException {
  4. if (isFinished) {
  5. return;
  6. }
  7. // send out request
  8. List<ResourceRequest> ask = null;
  9. if (mapFinished != mapTotal) {
  10. // map phase
  11. if (!pendingMaps.isEmpty()) {
  12. ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
  13. PRIORITY_MAP);
  14. LOG.debug("Application {} sends out request for {} mappers.",

看到packageRequests的传入参数是mergeLists(pendingMaps, scheduledMaps)。


  1. protected List<ResourceRequest> packageRequests(
  2. List<ContainerSimulator> csList, int priority) {

对比可发现,mergeLists(pendingMaps, scheduledMaps)就是List<ContainerSimulator>类型,猜测也就是List<ContainerSimulator> containerList。

在packageRequests中的一些结构变化和传递见博文:YARN-SLS AM,RM协议解析


  1. protected List<ResourceRequest> packageRequests(
  2. List<ContainerSimulator> csList, int priority) {
  3. // create requests
  4. Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();
  5. Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
  6. ResourceRequest anyRequest = null;
  7. for (ContainerSimulator cs : csList) {
  8. if (cs.getHostname() != null) {
  9. String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
  10. // check rack local
  11. String rackname = "/" + rackHostNames[0];
  12. if (rackLocalRequestMap.containsKey(rackname)) {
  13. rackLocalRequestMap.get(rackname).setNumContainers(
  14. rackLocalRequestMap.get(rackname).getNumContainers() + 1);
  15. } else {
  16. ResourceRequest request = createResourceRequest(cs.getResource(),
  17. cs.getExecutionType(), rackname, priority, 1);
  18. rackLocalRequestMap.put(rackname, request);
  19. }
  20. // check node local
  21. String hostname = rackHostNames[1];
  22. if (nodeLocalRequestMap.containsKey(hostname)) {
  23. nodeLocalRequestMap.get(hostname).setNumContainers(
  24. nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
  25. } else {
  26. ResourceRequest request = createResourceRequest(cs.getResource(),
  27. cs.getExecutionType(), hostname, priority, 1);
  28. nodeLocalRequestMap.put(hostname, request);
  29. }
  30. }
  31. // any
  32. if (anyRequest == null) {
  33. anyRequest = createResourceRequest(cs.getResource(),
  34. cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
  35. } else {
  36. anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
  37. }
  38. }
  39. List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
  40. ask.addAll(nodeLocalRequestMap.values());
  41. ask.addAll(rackLocalRequestMap.values());
  42. if (anyRequest != null) {
  43. ask.add(anyRequest);
  44. }
  45. return ask;
  46. }

在packageRequests中,通过


  1. ResourceRequest request = createResourceRequest(cs.getResource(),
  2. cs.getExecutionType(), hostname, priority, 1);

将List<ContainerSimulator> containerList的资源(resource)以及其他一些资源特性封装为ResourceRequest格式的request。

看下该函数:

createResourceRequest:


  1. protected ResourceRequest createResourceRequest(Resource resource,
  2. ExecutionType executionType, String host, int priority, int
  3. numContainers) {
  4. ResourceRequest request = recordFactory
  5. .newRecordInstance(ResourceRequest.class);
  6. request.setCapability(resource);
  7. request.setResourceName(host);
  8. request.setNumContainers(numContainers);
  9. request.setExecutionTypeRequest(
  10. ExecutionTypeRequest.newInstance(executionType));
  11. Priority prio = recordFactory.newRecordInstance(Priority.class);
  12. prio.setPriority(priority);
  13. request.setPriority(prio);
  14. return request;
  15. }

该函数就是将这些分散的资源(container resource,host,pri,numcontainer)封装成ResourceRequest类。

其中通过request.setCapability(resource); 将List<ContainerSimulator> containerList的资源(<mem, vcores>)封装进去

看下ResourceRequest的格式:


  1. message ResourceRequestProto {
  2. optional PriorityProto priority = 1;
  3. optional string resource_name = 2;
  4. optional ResourceProto capability = 3;
  5. optional int32 num_containers = 4;
  6. optional bool relax_locality = 5 [default = true];
  7. optional string node_label_expression = 6;
  8. optional ExecutionTypeRequestProto execution_type_request = 7;
  9. optional int64 allocation_request_id = 8 [default = -1];
  10. }

其中ResourceProto capability就是List<ContainerSimulator> containerList的资源(resource)。

然后回到sendContainerRequest函数:

ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),

PRIORITY_MAP);

ask就是List<ResourceRequest> 格式。sendContainerRequest接着往下走:


  1. if (ask == null) {
  2. ask = new ArrayList<>();
  3. }
  4. final AllocateRequest request = createAllocateRequest(ask);
  5. if (totalContainers == 0) {
  6. request.setProgress(1.0f);
  7. } else {
  8. request.setProgress((float) finishedContainers / totalContainers);
  9. }
  10. UserGroupInformation ugi =
  11. UserGroupInformation.createRemoteUser(appAttemptId.toString());
  12. Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
  13. .get(appAttemptId.getApplicationId())
  14. .getRMAppAttempt(appAttemptId).getAMRMToken();
  15. ugi.addTokenIdentifier(token.decodeIdentifier());
  16. AllocateResponse response = ugi.doAs(
  17. new PrivilegedExceptionAction<AllocateResponse>() {
  18. @Override
  19. public AllocateResponse run() throws Exception {
  20. return rm.getApplicationMasterService().allocate(request);
  21. }
  22. });
  23. if (response != null) {
  24. responseQueue.put(response);
  25. }

可以看到,通过createAllocateRequest(ask)函数将ask(List<ResourceRequest> )转换为request(AllocateRequest)。


  1. protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask,
  2. List<ContainerId> toRelease) {
  3. AllocateRequest allocateRequest =
  4. recordFactory.newRecordInstance(AllocateRequest.class);
  5. allocateRequest.setResponseId(responseId++);
  6. allocateRequest.setAskList(ask);
  7. allocateRequest.setReleaseList(toRelease);
  8. return allocateRequest;
  9. }

ask被最终包装成AllocateRequest类型的request,AllocateRequest就是AM向RM申请资源最终的格式。在真正的YARN中,AM与RM通信也是AllocateRequest格式。

以上就是AM向RM请求资源的协议解析。包括AM自身的container以及AM的task的container。

YARN SLS-AM请求资源的协议解析

相关推荐