远程提交mapreduce到hadoop集群
从13年初,开始搞hadoop的时候,是采用hadoop-eclipse插件来进行开发mapreduce,发现使用这个插件,其实也就是把相关的jar,class文件提交到远程的hadoop集群。
而实际上要部署应用的时候,如果不在远程提交,就得把任务代码打包成JAR,ftp到集群机器上进行执行。当然也可以在一个client机器上部署一套hadoop环境,把任务JAR放在这里,再提交到JobTracker。
对于想在web应用中触发远程mapreduce任务(或者是本地java应用触发远程mapreduce任务),就比较麻烦,上述插件方式/打包JAR后ftp也并不可取。
我实现了一个Util类,主要功能就是根据class寻找到相应的jar包,并把相应的jar路径添加到Configration中。而这样的话就摆脱了使用hadoop-eclipse插件进行开发,也方便应用集成远程mapreduce服务,适用于MR代码打包成独立JAR的情况。
ConfigurationUtil工具类实现如下:
import java.io.IOException; import java.net.URL; import java.net.URLDecoder; import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.context.MessageSource; public class ConfigurationUtil { private static Map<MessageSource,Configuration> confs; private static final String DEFAULT_MAPRED_JOB_TRACKER="xxxxxx"; private static final String DEFAULT_FS_NAME = "xxxxxxx"; private static final String DEFAULT_REMOTE_USER = "xxxxxxx"; static{ confs = new HashMap<MessageSource,Configuration>(); } /** * 获取默认的hadoop用户默认配置 * @return */ public static Configuration getDefaultConfiguration(MessageSource messageSource){ if(!confs.containsKey(messageSource)){ synchronized(ConfigurationUtil.class){ if(!confs.containsKey(messageSource)){ confs.put(messageSource, getTempConfiguration(messageSource)); } } } return confs.get(messageSource); } /** * 获取默认的hadoop用户临时配置 * @return */ public static Configuration getTempConfiguration(MessageSource messageSource){ Configuration temp = new Configuration(); temp.set("mapred.job.tracker", messageSource.getMessage("mapred.job.tracker", null, DEFAULT_MAPRED_JOB_TRACKER, null)); temp.set("fs.default.name", messageSource.getMessage("fs.default.name", null, DEFAULT_FS_NAME, null)); temp.set("user", messageSource.getMessage("remote.hadoop.user", null, DEFAULT_REMOTE_USER, null)); return temp; } /** * 设置userClass所在的jar * @param userClass mr依赖的class * @return * @throws IOException */ public static Configuration getTempConfiguration(Class[] userClass,MessageSource messageSource) throws IOException{ Configuration temp = getTempConfiguration(messageSource); for(Class c : userClass){ addTmpJar(c,temp); } return temp; } /** * 添加jar包 * @param jarPath * @param conf * @throws IOException */ protected static void addTmpJar(String jarPath, Configuration conf) throws IOException { System.setProperty("path.separator", ":"); FileSystem fs = FileSystem.getLocal(conf); String newJarPath = new Path(jarPath).makeQualified(fs).toString(); String tmpjars = conf.get("tmpjars"); if (tmpjars == null || tmpjars.length() == 0) { conf.set("tmpjars", newJarPath); } else if(!(","+tmpjars).contains(","+newJarPath)){ conf.set("tmpjars", tmpjars + "," + newJarPath); } } /** * 根据class添加JAR * @param cls * @param conf * @throws IOException */ protected static void addTmpJar(Class cls, Configuration conf) throws IOException { addTmpJar(findContainingJar(cls),conf); } protected static String findContainingJar(Class cls) { ClassLoader loader = cls.getClassLoader(); String class_file = cls.getName().replaceAll("\\.", "/") + ".class"; try { for(Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { URL url = (URL) itr.nextElement(); if ("jar".equals(url.getProtocol())) { String toReturn = url.getPath(); if (toReturn.startsWith("file:")) { toReturn = toReturn.substring("file:".length()); } toReturn = URLDecoder.decode(toReturn, "UTF-8"); return toReturn.replaceAll("!.*$", ""); } } } catch (IOException e) { throw new RuntimeException(e); } return null; } }
使用此工具类,如下所示:
Configuration conf = ConfigurationUtil.getTempConfiguration(new Class[]{ConfigurationUtil.class,Mapreduce.class},messageSource); Job job = new Job(conf, 'job-name'); . . . job.waitForCompletion(true);
如此,在此处需要依赖hadoop-core.jar包,我是用maven管理工程,所以把hadoop-core安装到了maven仓库进行使用。
这样集成后,就可以在web/java项目中,自定义mapreduce任务,并远程提交到hadoop集群进行操作。
时隔一年,整理一下文档,把这个分享出来。另外一种方式就是可以使用api把jar添加到distributcache。不过觉得没有这种方式方便。
相关推荐
tomli 2020-07-26
changjiang 2020-11-16
minerd 2020-10-28
WeiHHH 2020-09-23
Aleks 2020-08-19
WeiHHH 2020-08-17
飞鸿踏雪0 2020-07-26
deyu 2020-07-21
strongyoung 2020-07-19
eternityzzy 2020-07-19
Elmo 2020-07-19
飞鸿踏雪0 2020-07-09
飞鸿踏雪0 2020-07-04
xieting 2020-07-04
WeiHHH 2020-06-28
genshengxiao 2020-06-26
Hhanwen 2020-06-25