Hadoop源码解读-JobTracker处理HeartBeat

JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                                                  boolean restarted,
                                                  boolean initialContact,
                                                  boolean acceptNewTasks, 
                                                  short responseId)

1首先检查heartbeat是否来自自己的host列表,否则抛出异常。

如果不再Host列表或者在排除Host列表中,退出心跳处理。

 
return (inHostsList(status) && !inExcludedHostsList(status));

2判断是否在黑名单、灰名单、默认名单,并从这些名单中删除

黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。

 
faultyTrackers.markTrackerHealthy(status.getHost());

3根据trackerName获取上一个heartbeatresponse

 
HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);

4如果上一个heartbeat为null,让Tasktracker重新初始化如果是第一个response从recoveryMap中移除

   
if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly 
        // started JobTracker
        if (hasRestarted()) {
          addRestartInfo = true;
          // inform the recovery manager about this tracker joining back
          recoveryManager.unMarkTracker(trackerName);
        } else {
          // Jobtracker might have restarted but no recovery is needed
          // otherwise this code should not be reached
          LOG.warn("Serious problem, cannot find record of 'previous' " +
                   "heartbeat for '" + trackerName + 
                   "'; reinitializing the tasktracker");
          return new HeartbeatResponse(responseId, 
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }

      }

如果重发的responseId,丢弃掉。

  
if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info("Ignoring 'duplicate' heartbeat from '" + 
              trackerName + "'; resending the previous 'lost' response");
          return prevHeartbeatResponse;
        }

5处理heartbeat首先updateTaskTrackerStatus如果是被遗忘的tasktracker加入队列中;更新任务状态;更新健康节点状态;

  
private synchronized boolean processHeartbeat(
                                 TaskTrackerStatus trackerStatus, 
                                 boolean initialContact,
                                 long timeStamp) throws UnknownHostException {
   //主要集中在此不分析那么详细了
}

6检查新Task是否执行,如果没有执行,加入执行队列

 
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null ) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
//添加Task
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }

7检查Task是否杀死

List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
    if (killTasksList != null) {
      actions.addAll(killTasksList);
    }

8检查task是否cleanup

 
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
    if (killJobsList != null) {
      actions.addAll(killJobsList);
    }

9检查task的output是否可以save

  
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
    if (commitTasksList != null) {
      actions.addAll(commitTasksList);
    }

10计算下一次heartbeat的时间间隔

  
int nextInterval = getNextHeartbeatInterval();
    response.setHeartbeatInterval(nextInterval);
    response.setActions(
                        actions.toArray(new TaskTrackerAction[actions.size()]));

11更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat

  
// 更新Map
  trackerToHeartbeatResponseMap.put(trackerName, response);

    //清除处理完成的心跳
    removeMarkedTasks(trackerName);

不对之处欢迎讨论。

=================参考====

hadoop源码。

相关推荐