远程提交mapreduce到hadoop集群

从13年初,开始搞hadoop的时候,是采用hadoop-eclipse插件来进行开发mapreduce,发现使用这个插件,其实也就是把相关的jar,class文件提交到远程的hadoop集群。

而实际上要部署应用的时候,如果不在远程提交,就得把任务代码打包成JAR,ftp到集群机器上进行执行。当然也可以在一个client机器上部署一套hadoop环境,把任务JAR放在这里,再提交到JobTracker。

对于想在web应用中触发远程mapreduce任务(或者是本地java应用触发远程mapreduce任务),就比较麻烦,上述插件方式/打包JAR后ftp也并不可取。

我实现了一个Util类,主要功能就是根据class寻找到相应的jar包,并把相应的jar路径添加到Configration中。而这样的话就摆脱了使用hadoop-eclipse插件进行开发,也方便应用集成远程mapreduce服务,适用于MR代码打包成独立JAR的情况。

ConfigurationUtil工具类实现如下:

   
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.context.MessageSource;

public class ConfigurationUtil {
	private static Map<MessageSource,Configuration> confs;
	private static final String DEFAULT_MAPRED_JOB_TRACKER="xxxxxx";
	private static final String DEFAULT_FS_NAME = "xxxxxxx";
	private static final String DEFAULT_REMOTE_USER = "xxxxxxx";
	
	static{
		confs = new HashMap<MessageSource,Configuration>();
	}
	/**
	 * 获取默认的hadoop用户默认配置
	 * @return
	 */
	public static Configuration getDefaultConfiguration(MessageSource messageSource){
		if(!confs.containsKey(messageSource)){
			synchronized(ConfigurationUtil.class){
				if(!confs.containsKey(messageSource)){
					confs.put(messageSource, getTempConfiguration(messageSource));
				}
			}
		}
	    return confs.get(messageSource);
	}
	/**
	 * 获取默认的hadoop用户临时配置
	 * @return
	 */
	public static Configuration getTempConfiguration(MessageSource messageSource){
		Configuration temp = new Configuration();
		temp.set("mapred.job.tracker", messageSource.getMessage("mapred.job.tracker", null, DEFAULT_MAPRED_JOB_TRACKER, null));
		temp.set("fs.default.name", messageSource.getMessage("fs.default.name", null, DEFAULT_FS_NAME, null));
		temp.set("user", messageSource.getMessage("remote.hadoop.user", null, DEFAULT_REMOTE_USER, null));
		return temp;
	}
	
	/**
	 * 设置userClass所在的jar
	 * @param userClass mr依赖的class
	 * @return
	 * @throws IOException
	 */
	public static Configuration getTempConfiguration(Class[] userClass,MessageSource messageSource) throws IOException{
		Configuration temp = getTempConfiguration(messageSource);
		for(Class c : userClass){
			addTmpJar(c,temp);
		}
		return temp;
	}
	/**
	 * 添加jar包
	 * @param jarPath
	 * @param conf
	 * @throws IOException
	 */
	protected static void addTmpJar(String jarPath, Configuration conf) throws IOException {  
	    System.setProperty("path.separator", ":");  
	    FileSystem fs = FileSystem.getLocal(conf);  
	    String newJarPath = new Path(jarPath).makeQualified(fs).toString();  
	    String tmpjars = conf.get("tmpjars");  
	    if (tmpjars == null || tmpjars.length() == 0) {  
	        conf.set("tmpjars", newJarPath);  
	    } else if(!(","+tmpjars).contains(","+newJarPath)){
	        conf.set("tmpjars", tmpjars + "," + newJarPath);  
	    }  
	}
	/**
	 * 根据class添加JAR
	 * @param cls
	 * @param conf
	 * @throws IOException
	 */
	protected static void addTmpJar(Class cls, Configuration conf) throws IOException {  
		addTmpJar(findContainingJar(cls),conf);
	}
	
	protected static String findContainingJar(Class cls) {
	    ClassLoader loader = cls.getClassLoader();
	    String class_file = cls.getName().replaceAll("\\.", "/") + ".class";
	    try {
	      for(Enumeration itr = loader.getResources(class_file);
	          itr.hasMoreElements();) {
	        URL url = (URL) itr.nextElement();
	        if ("jar".equals(url.getProtocol())) {
	          String toReturn = url.getPath();
	          if (toReturn.startsWith("file:")) {
	            toReturn = toReturn.substring("file:".length());
	          }
	          toReturn = URLDecoder.decode(toReturn, "UTF-8");
	          return toReturn.replaceAll("!.*$", "");
	        }
	      }
	    } catch (IOException e) {
	      throw new RuntimeException(e);
	    }
	    return null;
	 }
}

使用此工具类,如下所示:

    
Configuration conf = ConfigurationUtil.getTempConfiguration(new Class[]{ConfigurationUtil.class,Mapreduce.class},messageSource);
      Job job = new Job(conf, 'job-name');
      .
      .
      .
      job.waitForCompletion(true);

如此,在此处需要依赖hadoop-core.jar包,我是用maven管理工程,所以把hadoop-core安装到了maven仓库进行使用。

这样集成后,就可以在web/java项目中,自定义mapreduce任务,并远程提交到hadoop集群进行操作。

时隔一年,整理一下文档,把这个分享出来。另外一种方式就是可以使用api把jar添加到distributcache。不过觉得没有这种方式方便。

相关推荐