Hadoop JobTracker提交job源码浅析

上一篇说到jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials())这里,这里就是jobTracker进行job的提交过程,还有一个JobSubmissionProtocol的实现是LocalJobRunner,这是本地执行的时候使用的,真正集群运行Job还是使用的jobTracker,所以只看jobTracker类的submitJob。

1.jobTracker.submitJob():第一句就是checkJobTrackerState()这个是检查jobTracker状态,是否运行中,这里说一句,jobTracker是在Hadoop集群启动的时候启动的,也就是在执行start-all或者start-mapred的时候启动,启动的时候会调用JobTracker的main方法,然后在jps的时候就可以看见一个jobTracker的进程了。下面来看一下JobTracker.main()方法。

2.JobTracker.main():第一句是JobTracker tracker = startTracker(new JobConf()),这是实例化一个jobTracke实例。

3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个jobTracker对象,在实例化的时候会做很多事,所以还是进去瞅瞅。

4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化taskScheduler的内容:Class<? extends TaskScheduler> schedulerClass

      = conf.getClass("mapred.jobtracker.taskScheduler",JobQueueTaskScheduler.class, TaskScheduler.class);  taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的taskScheduler类名,通过反射获得对应的taskScheduler对象,在实例化的时候虽然不同的TaskScheduler具体操作不一样,但是统一的都会初始化一个JobListener对象,这个对象就是后面将要监听job的listener。剩下的内容就不说了。回到JobTracker.startTracker()方法。

5.JobTracker.JobTracker():在实例化jobTracker之后,会执行result.taskScheduler.setTaskTrackerManager(result),这个就是将jobTracker对象设置给taskScheduler。后面就什么了,现在可以回到main方法了

public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
  throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize("JobTracker");
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn("Error starting tracker: " +
                StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
      MBeans.register("JobTracker", "JobTrackerInfo", result);
      if(initialize == true) {
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
        result.initializeFilesystem();
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
        result.initialize();
      }
    }
    return result;
  }

相关阅读

相关推荐