hadoop 分布式应用(个人学习笔记)

一,hadoop介绍

hadoop是apache的开源软件,用于分布式任务计算,包括mapreduce(首先由谷歌提出,并应用)分布式计算框架和hdfs文件系统两部分。hadoop让开发人员在不了解底层细节的情况下,轻松开发分布式应用。

二,hadoopjob提交流程

1)JobClient运行Job任务

JobClient.runJob(Job.clss);

2)JobClient向JobTracker申请一个JobId;

3)配置Job运行环境(copy相关配置,和Jar文件到本地);

JobClient.copyRemoteFile();

4)计算Job输入数据(根据配置的InputSplit大小,计算Map数)

5)提交Job,调用JobTracker.submit();

6)JobTracker分配Job给TaskTracker;

7)TaskTracker启动Jvm计算任务(默认为每个任务启动一个Jvm)。

三,mapreduce框架原理(分久闭合)

map端拆分数据,reduce端合并数据

3.1)map端用于数据解析,mapreduce框架会尽量使用数据本地化计算(减少数据传输带来的网络流量),提高效率。map计算过程中,首先会将计算结果<key,value>存在内存中,在达到一定阀值(可以配置)后写入临时文件(减少磁盘IO次数)。map端的计算结果做为中间数据用于reduce端的数据输入。

3.2)reduce端负责最终结果统计,reduce计算结束将结果<key,value>存入hdfs,然后删除map端中间数据。

四,hadoop任务分配相关源码学习

1)List<Task>tasks=getSetupAndCleanupTasks(taskTrackerStatus);

if(tasks==null){

tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));

}

if(tasks!=null){

for(Tasktask:tasks){

expireLaunchingTasks.addNewTask(task.getTaskID());

if(LOG.isDebugEnabled()){

LOG.debug(trackerName+"->LaunchTask:"+task.getTaskID());

}

actions.add(newLaunchTaskAction(task));

}

}

1.1)getSetupAndCleanupTasks(taskTrackerStatus);//先分配辅助任务

1,2)tasks=taskScheduler.assignTasks(taskTrackers.get(trackerName));

//如果不存在辅助任务就由hadoop任务调度器分配计算任务

2)publicvoidofferService();//启动jobtracker服务(启动各种内部线程)

3)publicsynchronizedHeartbeatResponseheartbeat(TaskTrackerStatusstatus,booleanrestarted,booleaninitialContact,

booleanacceptNewTasks,shortresponseId)throwsIOException{}//jobtracker处理tasktracker发送心跳方法,首先检查该tasktracker是否属于节点黑名单,再检查tasktracker是否健康...最后通过心跳给tasktracker下达分配的任务。

五,部分相关属性

1)Map<JobID,JobInProgress>jobs=

Collections.synchronizedMap(newTreeMap<JobID,JobInProgress>());

//存储客户端提交的job信息,jobtracker为客户端提交的job生成一个JobInProgress对象进行跟踪;jobtracker采用三层多叉树(一个jobInProgress对应多个TaskInProgress,一个TaskInProgress对应多个TaskAttempt)的结构进行任务的追踪。

2)TreeMap<String,ArrayList<JobInProgress>>userToJobsMap=

newTreeMap<String,ArrayList<JobInProgress>>();//映射用户的job信息,hadoop集群可能创建多个账户

六,jobtracker任务分配

jobtracker从正在运行的job任务中选择取job,分配task前jobtracker会判断申请任务的tasktracker是否还有能力去执行新任务。如果可以执行,jobtracker首先会尝试为每个job分配一个本地任务或者机架本地任务,如果不存在本地任务就分配非本地任务(先分配map任务,然后分配reduce任务)。

Collection<JobInProgress>jobQueue=

jobQueueJobInProgressListener.getJobQueue();//获取job队列

synchronized(jobQueue){

for(JobInProgressjob:jobQueue){

if(job.getStatus().getRunState()==JobStatus.RUNNING){

remainingMapLoad+=(job.desiredMaps()-job.finishedMaps());

if(job.scheduleReduces()){

remainingReduceLoad+=

(job.desiredReduces()-job.finishedReduces());

}

}

}

}//计算所有运行job剩下的map数和reduce数

for(inti=0;i<availableMapSlots;++i){

synchronized(jobQueue){

for(JobInProgressjob:jobQueue){

if(job.getStatus().getRunState()!=JobStatus.RUNNING){

continue;

}

Taskt=null;

//Trytoscheduleanode-localorrack-localMaptask

t=

job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,

numTaskTrackers,taskTrackerManager.getNumberOfUniqueHosts());

if(t!=null){

assignedTasks.add(t);

++numLocalMaps;

//Don'tassignmaptaskstothehilt!

//Leavesomefreeslotsintheclusterforfuturetask-failures,

//speculativetasksetc.beyondthehighestpriorityjob

if(exceededMapPadding){

breakscheduleMaps;

}

//TryalljobsagainforthenextMaptask

break;

}

//Trytoscheduleanode-localorrack-localMaptask

t=

job.obtainNewNonLocalMapTask(taskTrackerStatus,numTaskTrackers,

taskTrackerManager.getNumberOfUniqueHosts());

if(t!=null){

assignedTasks.add(t);

++numNonLocalMaps;

//Weassignatmost1off-switchorspeculativetask

//ThisistopreventTaskTrackersfromstealinglocal-tasks

//fromotherTaskTrackers.

breakscheduleMaps;

}

}

}

}//map任务分配

booleanexceededReducePadding=false;

if(availableReduceSlots>0){

exceededReducePadding=exceededPadding(false,clusterStatus,

trackerReduceCapacity);

synchronized(jobQueue){

for(JobInProgressjob:jobQueue){

if(job.getStatus().getRunState()!=JobStatus.RUNNING||

job.numReduceTasks==0){

continue;

}

Taskt=

job.obtainNewReduceTask(taskTrackerStatus,numTaskTrackers,

taskTrackerManager.getNumberOfUniqueHosts()

);

if(t!=null){

assignedTasks.add(t);

break;

}

//Don'tassignreducetaskstothehilt!

//Leavesomefreeslotsintheclusterforfuturetask-failures,

//speculativetasksetc.beyondthehighestpriorityjob

if(exceededReducePadding){

break;

}

}

}

}//reduce任务分配,reduce任务不用考虑数据本地计算特性

相关推荐