YARN SLS-AM请求资源的协议解析
在slsrunner.java中:
startAM->startAMFromSLSTrace->createAMForJob->runNewAM
在runNewAM函数中,最后两个参数很重要:
- private void createAMForJob(Map jsonJob) throws YarnException {
- ......
- for (int i = 0; i < jobCount; i++) {
- runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
- getTaskContainers(jsonJob), getAMContainerResource(jsonJob));
- }
- }
最后两个参数分别是getTaskContainers和getAMContainerResource,前者是获取task所需的container的需求,后者是获得AM运行的container的需求。
先看后者:
getAMContainerResource,函数返回对象是Resource类
- private Resource getAMContainerResource(Map jsonJob) {
- Resource amContainerResource =
- SLSConfiguration.getAMContainerResource(getConf());
- if (jsonJob == null) {
- return amContainerResource;
- }
- ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
- for (ResourceInformation info : infors) {
- String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
- if (jsonJob.containsKey(key)) {
- long value = Long.parseLong(jsonJob.get(key).toString());
- amContainerResource.setResourceValue(info.getName(), value);
- }
- }
- return amContainerResource;
- }
Resource类在org.apache.hadoop.yarn.api.records.Resource.java中实现的。
是<mem, vcores>的形式,也就是单个container的resource量。
在函数的开始,通过SLSConfiguration.getAMContainerResource(getConf())获得SLSconfiguration对于AM的resource初始量:
getAMContainerResource.getAMContainerResource:
- public static Resource getAMContainerResource(Configuration conf) {
- return Resource.newInstance(
- conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT),
- conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT));
- }
可以看到AM的mem default是1024,vcores default是1
- public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
- public static final String AM_CONTAINER_VCORES = AM_PREFIX +
- "container.vcores";
- public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
这就将AM的container的需求量得到了。
然后再看getTaskContainers:
- private List<ContainerSimulator> getTaskContainers(Map jsonJob)
- throws YarnException {
- List<ContainerSimulator> containers = new ArrayList<>();
- List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
- if (tasks == null || tasks.size() == 0) {
- throw new YarnException("No task for the job!");
- }
- for (Object o : tasks) {
- Map jsonTask = (Map) o;
- String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
- long duration = 0;
- if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
- duration = Integer.parseInt(
- jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
- } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
- // Also support "duration.ms" for backward compatibility
- duration = Integer.parseInt(
- jsonTask.get(SLSConfiguration.DURATION_MS).toString());
- } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
- jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
- long taskStart = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
- long taskFinish = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
- duration = taskFinish - taskStart;
- }
- if (duration <= 0) {
- throw new YarnException("Duration of a task shouldn't be less or equal"
- + " to 0!");
- }
- Resource res = getResourceForContainer(jsonTask);
- int priority = DEFAULT_MAPPER_PRIORITY;
- if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
- priority = Integer.parseInt(
- jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
- }
- String type = "map";
- if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
- type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
- }
- int count = 1;
- if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
- count = Integer.parseInt(
- jsonTask.get(SLSConfiguration.COUNT).toString());
- }
- count = Math.max(count, 1);
- ExecutionType executionType = ExecutionType.GUARANTEED;
- if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
- executionType = ExecutionType.valueOf(
- jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
- }
- for (int i = 0; i < count; i++) {
- containers.add(
- new ContainerSimulator(res, duration, hostname, priority, type,
- executionType));
- }
- }
- return containers;
- }
函数返回的对象是List<ContainerSimulator>
该类的实现在org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator:
- public class ContainerSimulator implements Delayed {
- // id
- private ContainerId id;
- // resource allocated
- private Resource resource;
- // end time
- private long endTime;
- // life time (ms)
- private long lifeTime;
- // host name
- private String hostname;
- // priority
- private int priority;
- // type
- private String type;
- // execution type
- private ExecutionType executionType = ExecutionType.GUARANTEED;
可以看到包括了Resource类,还有一些其他属性,比如id,endtime,hostname,priority等。
也就是说给task分配的container的属性,要比给AM分配的container属性要多。
回到getTaskContainers,对了循环了sls-jobs.json中每个task,然后获得task在-jobs.json中描述的属性,特别关注下resource的获得:
Resource res = getResourceForContainer(jsonTask);
- private Resource getResourceForContainer(Map jsonTask) {
- Resource res = getDefaultContainerResource();
- ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
- for (ResourceInformation info : infors) {
- if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
- long value = Long.parseLong(
- jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
- .toString());
- res.setResourceValue(info.getName(), value);
- }
- }
- return res;
- }
可以看到是通过getDefaultContainerResource获得的,看这个函数的实现:
getDefaultContainerResource:
- private Resource getDefaultContainerResource() {
- int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
- SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
- int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
- SLSConfiguration.CONTAINER_VCORES_DEFAULT);
- return Resources.createResource(containerMemory, containerVCores);
- }
可以看到不出所料,还是获得SLSConfiguration中的default mem:1024和default vcore:1
现在可以总结了:AM的container和task的container是分开分配的。AM的container分配的资源类型是resource:<mem, core>,而task的container分配的资源类型是ContainerSimulator,resource是其中的一个成员,还有其他的成员,比如hostname,id等。
再回到最初的runNewAM:
将AM获得的container资源和task获得的container资源作为参数传入后。
- private void runNewAM(String jobType, String user,
- String jobQueue, String oldJobId, long jobStartTimeMS,
- long jobFinishTimeMS, List<ContainerSimulator> containerList,
- Resource amContainerResource) {
- runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
- jobFinishTimeMS, containerList, null, -1,
- amContainerResource, null);
- }
- private void runNewAM(String jobType, String user,
- String jobQueue, String oldJobId, long jobStartTimeMS,
- long jobFinishTimeMS, List<ContainerSimulator> containerList,
- ReservationId reservationId, long deadline, Resource amContainerResource,
- Map<String, String> params) {
- AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
- amClassMap.get(jobType), new Configuration());
- if (amSim != null) {
- int heartbeatInterval = getConf().getInt(
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
- boolean isTracked = trackedApps.contains(oldJobId);
- if (oldJobId == null) {
- oldJobId = Integer.toString(AM_ID);
- }
- AM_ID++;
- amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
- jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
- runner.getStartTimeMS(), amContainerResource, params);
- if(reservationId != null) {
- // if we have a ReservationId, delegate reservation creation to
- // AMSim (reservation shape is impl specific)
- UTCClock clock = new UTCClock();
- amSim.initReservation(reservationId, deadline, clock.getTime());
- }
- runner.schedule(amSim);
- maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
- numTasks += containerList.size();
- amMap.put(oldJobId, amSim);
- }
- }
可以看到作为amSim.init进行初始化了。
amSim.init:
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager resourceManager,
- SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
- String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, Map<String, String> params) {
- super.init(startTime, startTime + 1000000L * heartbeatInterval,
- heartbeatInterval);
- this.user = simUser;
- this.rm = resourceManager;
- this.se = slsRunnner;
- this.queue = simQueue;
- this.oldAppId = oldApp;
- this.isTracked = tracked;
- this.baselineTimeMS = baseTimeMS;
- this.traceStartTimeMS = startTime;
- this.traceFinishTimeMS = finishTime;
- this.amContainerResource = amResource;
- }
可以看到,amResource赋给了this.amContainerResource。作为AM的一个成员了。
接下来看看this.amContainerResource在哪被调用了:
- private void submitApp(ReservationId reservationId)
- throws YarnException, InterruptedException, IOException {
- // ask for new application
- GetNewApplicationRequest newAppRequest =
- Records.newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse newAppResponse =
- rm.getClientRMService().getNewApplication(newAppRequest);
- appId = newAppResponse.getApplicationId();
- // submit the application
- final SubmitApplicationRequest subAppRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- ApplicationSubmissionContext appSubContext =
- Records.newRecord(ApplicationSubmissionContext.class);
- appSubContext.setApplicationId(appId);
- appSubContext.setMaxAppAttempts(1);
- appSubContext.setQueue(queue);
- appSubContext.setPriority(Priority.newInstance(0));
- ContainerLaunchContext conLauContext =
- Records.newRecord(ContainerLaunchContext.class);
- conLauContext.setApplicationACLs(new HashMap<>());
- conLauContext.setCommands(new ArrayList<>());
- conLauContext.setEnvironment(new HashMap<>());
- conLauContext.setLocalResources(new HashMap<>());
- conLauContext.setServiceData(new HashMap<>());
- appSubContext.setAMContainerSpec(conLauContext);
- appSubContext.setResource(amContainerResource);
可以看到,在submitApp中,通过appSubContext.setResource(amContainerResource),amContainerResource作为appSubContext的一部分。
关于appSubContext结构的分析,见另一篇博文:YARN-SLS AM,RM协议解析
ApplicationSubmissionContext的协议结构如下:
- message ApplicationSubmissionContextProto {
- optional ApplicationIdProto application_id = 1;
- optional string application_name = 2 [default = "N/A"];
- optional string queue = 3 [default = "default"];
- optional PriorityProto priority = 4;
- optional ContainerLaunchContextProto am_container_spec = 5;
- optional bool cancel_tokens_when_complete = 6 [default = true];
- optional bool unmanaged_am = 7 [default = false];
- optional int32 maxAppAttempts = 8 [default = 0];
- optional ResourceProto resource = 9;
- optional string applicationType = 10 [default = "YARN"];
- optional bool keep_containers_across_application_attempts = 11 [default = false];
- repeated string applicationTags = 12;
- optional int64 attempt_failures_validity_interval = 13 [default = -1];
- optional LogAggregationContextProto log_aggregation_context = 14;
- optional ReservationIdProto reservation_id = 15;
- optional string node_label_expression = 16;
- repeated ResourceRequestProto am_container_resource_request = 17;
- repeated ApplicationTimeoutMapProto application_timeouts = 18;
- repeated StringStringMapProto application_scheduling_properties = 19;
- }
其中:optional ResourceProto resource就是通过appSubContext.setResource(amContainerResource)赋值过去的。
然后将appSubContext作为subAppRequest的一部分,最后通过:
rm.getClientRMService().submitApplication(subAppRequest);传递给RM。
以上就是AM的container请求传递给RM的过程。
那么AM中task的container请求传递给RM是什么样的呢?
在runNewAM中,ask的container请求的形式是List<ContainerSimulator> containerList,传入到amSim.init,与上面相同,但奇怪的是在amSim.init中,并未将containerList参数传给任何对象。
- public void init(int heartbeatInterval,
- List<ContainerSimulator> containerList, ResourceManager resourceManager,
- SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
- String simQueue, boolean tracked, String oldApp, long baseTimeMS,
- Resource amResource, Map<String, String> params) {
- super.init(startTime, startTime + 1000000L * heartbeatInterval,
- heartbeatInterval);
- this.user = simUser;
- this.rm = resourceManager;
- this.se = slsRunnner;
- this.queue = simQueue;
- this.oldAppId = oldApp;
- this.isTracked = tracked;
- this.baselineTimeMS = baseTimeMS;
- this.traceStartTimeMS = startTime;
- this.traceFinishTimeMS = finishTime;
- this.amContainerResource = amResource;
- }
可以看到参数List<ContainerSimulator> containerList并没有传给其他对象。
在middlestep的sendContainerRequest中,调用了packageRequests方法来打包task的container request,应该是在某个地方进行了参数的传递。
转到sendContainerRequest:
- @Override
- protected void sendContainerRequest()
- throws YarnException, IOException, InterruptedException {
- if (isFinished) {
- return;
- }
- // send out request
- List<ResourceRequest> ask = null;
- if (mapFinished != mapTotal) {
- // map phase
- if (!pendingMaps.isEmpty()) {
- ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
- PRIORITY_MAP);
- LOG.debug("Application {} sends out request for {} mappers.",
看到packageRequests的传入参数是mergeLists(pendingMaps, scheduledMaps)。
- protected List<ResourceRequest> packageRequests(
- List<ContainerSimulator> csList, int priority) {
对比可发现,mergeLists(pendingMaps, scheduledMaps)就是List<ContainerSimulator>类型,猜测也就是List<ContainerSimulator> containerList。
在packageRequests中的一些结构变化和传递见博文:YARN-SLS AM,RM协议解析
- protected List<ResourceRequest> packageRequests(
- List<ContainerSimulator> csList, int priority) {
- // create requests
- Map<String, ResourceRequest> rackLocalRequestMap = new HashMap<String, ResourceRequest>();
- Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
- ResourceRequest anyRequest = null;
- for (ContainerSimulator cs : csList) {
- if (cs.getHostname() != null) {
- String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
- // check rack local
- String rackname = "/" + rackHostNames[0];
- if (rackLocalRequestMap.containsKey(rackname)) {
- rackLocalRequestMap.get(rackname).setNumContainers(
- rackLocalRequestMap.get(rackname).getNumContainers() + 1);
- } else {
- ResourceRequest request = createResourceRequest(cs.getResource(),
- cs.getExecutionType(), rackname, priority, 1);
- rackLocalRequestMap.put(rackname, request);
- }
- // check node local
- String hostname = rackHostNames[1];
- if (nodeLocalRequestMap.containsKey(hostname)) {
- nodeLocalRequestMap.get(hostname).setNumContainers(
- nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
- } else {
- ResourceRequest request = createResourceRequest(cs.getResource(),
- cs.getExecutionType(), hostname, priority, 1);
- nodeLocalRequestMap.put(hostname, request);
- }
- }
- // any
- if (anyRequest == null) {
- anyRequest = createResourceRequest(cs.getResource(),
- cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
- } else {
- anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
- }
- }
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ask.addAll(nodeLocalRequestMap.values());
- ask.addAll(rackLocalRequestMap.values());
- if (anyRequest != null) {
- ask.add(anyRequest);
- }
- return ask;
- }
在packageRequests中,通过
- ResourceRequest request = createResourceRequest(cs.getResource(),
- cs.getExecutionType(), hostname, priority, 1);
将List<ContainerSimulator> containerList的资源(resource)以及其他一些资源特性封装为ResourceRequest格式的request。
看下该函数:
createResourceRequest:
- protected ResourceRequest createResourceRequest(Resource resource,
- ExecutionType executionType, String host, int priority, int
- numContainers) {
- ResourceRequest request = recordFactory
- .newRecordInstance(ResourceRequest.class);
- request.setCapability(resource);
- request.setResourceName(host);
- request.setNumContainers(numContainers);
- request.setExecutionTypeRequest(
- ExecutionTypeRequest.newInstance(executionType));
- Priority prio = recordFactory.newRecordInstance(Priority.class);
- prio.setPriority(priority);
- request.setPriority(prio);
- return request;
- }
该函数就是将这些分散的资源(container resource,host,pri,numcontainer)封装成ResourceRequest类。
其中通过request.setCapability(resource); 将List<ContainerSimulator> containerList的资源(<mem, vcores>)封装进去
看下ResourceRequest的格式:
- message ResourceRequestProto {
- optional PriorityProto priority = 1;
- optional string resource_name = 2;
- optional ResourceProto capability = 3;
- optional int32 num_containers = 4;
- optional bool relax_locality = 5 [default = true];
- optional string node_label_expression = 6;
- optional ExecutionTypeRequestProto execution_type_request = 7;
- optional int64 allocation_request_id = 8 [default = -1];
- }
其中ResourceProto capability就是List<ContainerSimulator> containerList的资源(resource)。
然后回到sendContainerRequest函数:
ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
PRIORITY_MAP);
ask就是List<ResourceRequest> 格式。sendContainerRequest接着往下走:
- if (ask == null) {
- ask = new ArrayList<>();
- }
- final AllocateRequest request = createAllocateRequest(ask);
- if (totalContainers == 0) {
- request.setProgress(1.0f);
- } else {
- request.setProgress((float) finishedContainers / totalContainers);
- }
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
- Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
- .get(appAttemptId.getApplicationId())
- .getRMAppAttempt(appAttemptId).getAMRMToken();
- ugi.addTokenIdentifier(token.decodeIdentifier());
- AllocateResponse response = ugi.doAs(
- new PrivilegedExceptionAction<AllocateResponse>() {
- @Override
- public AllocateResponse run() throws Exception {
- return rm.getApplicationMasterService().allocate(request);
- }
- });
- if (response != null) {
- responseQueue.put(response);
- }
可以看到,通过createAllocateRequest(ask)函数将ask(List<ResourceRequest> )转换为request(AllocateRequest)。
- protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask,
- List<ContainerId> toRelease) {
- AllocateRequest allocateRequest =
- recordFactory.newRecordInstance(AllocateRequest.class);
- allocateRequest.setResponseId(responseId++);
- allocateRequest.setAskList(ask);
- allocateRequest.setReleaseList(toRelease);
- return allocateRequest;
- }
ask被最终包装成AllocateRequest类型的request,AllocateRequest就是AM向RM申请资源最终的格式。在真正的YARN中,AM与RM通信也是AllocateRequest格式。
以上就是AM向RM请求资源的协议解析。包括AM自身的container以及AM的task的container。