hadoop 分布式应用(个人学习笔记)
一,hadoop介绍
hadoop是apache的开源软件,用于分布式任务计算,包括mapreduce(首先由谷歌提出,并应用)分布式计算框架和hdfs文件系统两部分。hadoop让开发人员在不了解底层细节的情况下,轻松开发分布式应用。
二,hadoopjob提交流程
1)JobClient运行Job任务
JobClient.runJob(Job.clss);
2)JobClient向JobTracker申请一个JobId;
3)配置Job运行环境(copy相关配置,和Jar文件到本地);
JobClient.copyRemoteFile();
4)计算Job输入数据(根据配置的InputSplit大小,计算Map数)
5)提交Job,调用JobTracker.submit();
6)JobTracker分配Job给TaskTracker;
7)TaskTracker启动Jvm计算任务(默认为每个任务启动一个Jvm)。
三,mapreduce框架原理(分久闭合)
map端拆分数据,reduce端合并数据
3.1)map端用于数据解析,mapreduce框架会尽量使用数据本地化计算(减少数据传输带来的网络流量),提高效率。map计算过程中,首先会将计算结果<key,value>存在内存中,在达到一定阀值(可以配置)后写入临时文件(减少磁盘IO次数)。map端的计算结果做为中间数据用于reduce端的数据输入。
3.2)reduce端负责最终结果统计,reduce计算结束将结果<key,value>存入hdfs,然后删除map端中间数据。
四,hadoop任务分配相关源码学习
1)List<Task>tasks=getSetupAndCleanupTasks(taskTrackerStatus);
if(tasks==null){
tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if(tasks!=null){
for(Tasktask:tasks){
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()){
LOG.debug(trackerName+"->LaunchTask:"+task.getTaskID());
}
actions.add(newLaunchTaskAction(task));
}
}
1.1)getSetupAndCleanupTasks(taskTrackerStatus);//先分配辅助任务
1,2)tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));
//如果不存在辅助任务就由hadoop任务调度器分配计算任务
2)publicvoidofferService();//启动jobtracker服务(启动各种内部线程)
3)publicsynchronizedHeartbeatResponseheartbeat(TaskTrackerStatusstatus,booleanrestarted,booleaninitialContact,
booleanacceptNewTasks,shortresponseId)throwsIOException{}//jobtracker处理tasktracker发送心跳方法,首先检查该tasktracker是否属于节点黑名单,再检查tasktracker是否健康...最后通过心跳给tasktracker下达分配的任务。
五,部分相关属性
1)Map<JobID,JobInProgress>jobs=
Collections.synchronizedMap(newTreeMap<JobID,JobInProgress>());
//存储客户端提交的job信息,jobtracker为客户端提交的job生成一个JobInProgress对象进行跟踪;jobtracker采用三层多叉树(一个jobInProgress对应多个TaskInProgress,一个TaskInProgress对应多个TaskAttempt)的结构进行任务的追踪。
2)TreeMap<String,ArrayList<JobInProgress>>userToJobsMap=
newTreeMap<String,ArrayList<JobInProgress>>();//映射用户的job信息,hadoop集群可能创建多个账户
六,jobtracker任务分配
jobtracker从正在运行的job任务中选择取job,分配task前jobtracker会判断申请任务的tasktracker是否还有能力去执行新任务。如果可以执行,jobtracker首先会尝试为每个job分配一个本地任务或者机架本地任务,如果不存在本地任务就分配非本地任务(先分配map任务,然后分配reduce任务)。
Collection<JobInProgress>jobQueue=
jobQueueJobInProgressListener.getJobQueue();//获取job队列
synchronized(jobQueue){
for(JobInProgressjob:jobQueue){
if(job.getStatus().getRunState()==JobStatus.RUNNING){
remainingMapLoad+=(job.desiredMaps()-job.finishedMaps());
if(job.scheduleReduces()){
remainingReduceLoad+=
(job.desiredReduces()-job.finishedReduces());
}
}
}
}//计算所有运行job剩下的map数和reduce数
for(inti=0;i<availableMapSlots;++i){
synchronized(jobQueue){
for(JobInProgressjob:jobQueue){
if(job.getStatus().getRunState()!=JobStatus.RUNNING){
continue;
}
Taskt=null;
//Trytoscheduleanode-localorrack-localMaptask
t=
job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,
numTaskTrackers,taskTrackerManager.getNumberOfUniqueHosts());
if(t!=null){
assignedTasks.add(t);
++numLocalMaps;
//Don'tassignmaptaskstothehilt!
//Leavesomefreeslotsintheclusterforfuturetask-failures,
//speculativetasksetc.beyondthehighestpriorityjob
if(exceededMapPadding){
breakscheduleMaps;
}
//TryalljobsagainforthenextMaptask
break;
}
//Trytoscheduleanode-localorrack-localMaptask
t=
job.obtainNewNonLocalMapTask(taskTrackerStatus,numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if(t!=null){
assignedTasks.add(t);
++numNonLocalMaps;
//Weassignatmost1off-switchorspeculativetask
//ThisistopreventTaskTrackersfromstealinglocal-tasks
//fromotherTaskTrackers.
breakscheduleMaps;
}
}
}
}//map任务分配
booleanexceededReducePadding=false;
if(availableReduceSlots>0){
exceededReducePadding=exceededPadding(false,clusterStatus,
trackerReduceCapacity);
synchronized(jobQueue){
for(JobInProgressjob:jobQueue){
if(job.getStatus().getRunState()!=JobStatus.RUNNING||
job.numReduceTasks==0){
continue;
}
Taskt=
job.obtainNewReduceTask(taskTrackerStatus,numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if(t!=null){
assignedTasks.add(t);
break;
}
//Don'tassignreducetaskstothehilt!
//Leavesomefreeslotsintheclusterforfuturetask-failures,
//speculativetasksetc.beyondthehighestpriorityjob
if(exceededReducePadding){
break;
}
}
}
}//reduce任务分配,reduce任务不用考虑数据本地计算特性