Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——TT篇
在Hadoop中JT(JobTracker)与TT(TaskTracker)之间的通信是通过心跳机制完成的。JT实现InterTrackerProtocol协议,该协议定义了JT与TT之间的通信机制——心跳。心跳机制实际上就是一个RPC请求,JT作为Server,而TT作为Client,TT通过RPC调用JT的heartbeat方法,将TT自身的一些状态信息发送给JT,同时JT通过返回值返回对TT的指令。
目录:
心跳有三个作用:
1)判断TT是否活着
2)报告TT的资源情况以及任务运行情况
3)为TT发送指令(如运行task,kill task等)
下面详细阅读下涉及到心跳调用的源码。
首先我们需要清楚,心跳机制是TT调用JT的方法,而非JT主动调用TT的方法。TT通过transmitHeartBeat方法调用JT的heartbeat方法。
1.TaskTracker.transmitHeartBeat:
// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
根据sendCounters的间隔判断此次心跳是否发送计算器信息。
2.TaskTracker.transmitHeartBeat:
1.TaskTracker.transmitHeartBeat:
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the JobTracker;
// else resend the previous status information.
//
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
taskFailures,
localStorage.numFailures(),
maxMapSlots,
maxReduceSlots);
}
} else {
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
"' with reponseId '" + heartbeatResponseId);
}
此处根据status变量是否为null,判断上次的心跳是否成功发送。tatus!=null,则表示上次的心跳尚未发送,所以直接将上次收集到的TT状态信息(封装在status中)发送给JT;相反,status==null,则表示上次心跳已完成,重新收集TT的状态信息,同样封装到status中。下面详细看下new TaskTrackerStatus()方法。注意此处有个cloneAndResetRunningTaskStatuses(sendCounters)方法:
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
boolean sendCounters) {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
for(TaskInProgress tip: runningTasks.values()) {
TaskStatus status = tip.getStatus();
status.setIncludeCounters(sendCounters);
// send counters for finished or failed tasks and commit pending tasks
if (status.getRunState() != TaskStatus.State.RUNNING) {
status.setIncludeCounters(true);
}
result.add((TaskStatus)status.clone());
status.clearStatus();
}
return result;
}
该方法中涉及到runningTasks队列,该队列保存了该TT上接收的所有未完成的Task任务,通过runningTasks.values()可以获取TT当前所有未完成的Task,然后获取每个TaskInProgress的status信息,同时根据第一步判断出的sendCounters(true/false)决定是否发送counters信息(includeCounters),即是否将counters对象序列化到TaskStatus对象中,这里需要注意如果TaskInProgress不处于Running状态,则includeCounters设为true,即发送counters信息。
--------------------------------------分割线 --------------------------------------
--------------------------------------分割线 --------------------------------------
3.TaskTrackerStatus():
public TaskTrackerStatus(String trackerName, String host,
int httpPort, List<TaskStatus> taskReports,
int taskFailures, int dirFailures,
int maxMapTasks, int maxReduceTasks) {
this.trackerName = trackerName;
this.host = host;
this.httpPort = httpPort;
this.taskReports = new ArrayList<TaskStatus>(taskReports);
this.taskFailures = taskFailures;
this.dirFailures = dirFailures;
this.maxMapTasks = maxMapTasks;
this.maxReduceTasks = maxReduceTasks;
this.resStatus = new ResourceStatus();
this.healthStatus = new TaskTrackerHealthStatus();
}
这里只是进行简单的变量复制操作,分析下其中一些参数的含义:
1)taskReports:包含该TT上目前所有的Task状态信息,其中的counters信息会根据之前判断sendCounters值进行决定是否发送,上一步有提到。
2)taskFailures:该TT上失败的Task总数(重启会清空),该参数帮助JT决定是否向该TT提交Task,因为失败数越多表明该TT可能出现Task失败的概率越大。
3)dirFailures:这个值是mapred.local.dir参数设置的目录中有多少是不可用的(以后会详细提到)
4)maxMapSlots/maxReduceSlots:这个值是TT可使用的最大map和reduce slot数量
初始化完成,继续回到TaskTracker.transmitHeartBeat方法。
4.TaskTracker.transmitHeartBeat:
// Check if we should ask for a new Task
//
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
askForNewTask = enoughFreeSpace(localMinSpaceStart);
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
long totPmem = getTotalPhysicalMemoryOnTT();
long availableVmem = getAvailableVirtualMemoryOnTT();
long availablePmem = getAvailablePhysicalMemoryOnTT();
long cumuCpuTime = getCumulativeCpuTimeOnTT();
long cpuFreq = getCpuFrequencyOnTT();
int numCpu = getNumProcessorsOnTT();
float cpuUsage = getCpuUsageOnTT();
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
status.getResourceStatus().setCpuFrequency(cpuFreq);
status.getResourceStatus().setNumProcessors(numCpu);
status.getResourceStatus().setCpuUsage(cpuUsage);
}
从源码中的注释可以知道,此处是TT根据自身资源使用情况判断是否接收new task。