Hadoop作业提交终极解决
最近几天一直在纠结hadoop作业提交的问题,对于命令行提交作业以及在集群中提交作业这里不再赘述,不会就去Google吧!
我们在客户机提交作业的时候总是发现出错,eclipse总是报jar file not found 的错误!我们知道客户端提交任务的时候,使用方法job.setWaitForCompletion(true)的时候,这个方法会调用job的submit()方法,submit()方法又会调用jobclient的submitJobInternal(conf)方法向master提交一个任务,这个方法会向hadoop文件系统提交三个文件job.jar,job.split,和job.xml那么它报那个错误就是因为job.jar文件没有提交上去,也就是说我们像在伪分布下那样提交任务的话我们的jar包并没有打好,所以就需要我们想办法将jar包提交上去,为此我们实践出的三种客户端提交作业的方法。
一、安装插件法
将hadoop中的contrib/eclipse-plugin中的eclipse插件拷贝到eclipse插件文件夹下然后重启eclipse就可以了,然后配置集群参数就可以实现。这里也不再赘述。
如果我们安装了插件的话那么就可以直接像在伪分布中运行的那样写如下类似代码,然后设置配置参数,最后提交运行。
public void runPageRank(String inputPath, String outputPath) Throws IOException,InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://master:9000"); conf.set("hadoop.job.user", "hadoop"); conf.set("mapred.job.tracker", "master:9001"); Job job = new Job(conf, "PageRank"); job.setJarByClass(PageRankV3.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(PRMapper.class); job.setReducerClass(cn.luliang.pageRank.PRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); }
任务提交成功,很明显插件为我们打好了包,所以程序运行成功!
二、不安装插件法
在纠结很多次jar file not found 的错误之后我们根据eclipse中报的那个错误后面提示的那段jobClient.setJar(str),我们找到了一种解决方案,就是手动将我们的程序打成jar包,然后在代码中做如下改变:
public void runPageRank(String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://master:9000"); conf.set("hadoop.job.user", "hadoop"); conf.set("mapred.job.tracker", "master:9001"); Job job = new Job(conf, "PageRank"); ((JobConf) job.getConfiguration()).setJar("pr.jar"); job.setJarByClass(PageRankV3.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(PRMapper.class); job.setReducerClass(cn.luliang.pageRank.PRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); }
三、程序打包的方法
我们总觉得这个解决方案太过繁琐能不能找到一种更好的解决方案,不用手动打jar包!在查阅了大量资料之后我们终于找到了方法!
就是我们其实原来我们可以在程序中添加代码让程序为我们打jar包!这个世界真是太神奇啦!
看如下代码:
public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } //创建manifest文件 Manifest manifest = new Manifest(); //设置主属性 manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); //创建临时文件 final File jarFile = File.createTempFile("EJar-", ".jar", new File(System .getProperty("java.io.tmpdir"))); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } });
//创建Jar文件输出流 JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; }
//遍历目录下文件 private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } }
这里我们通过递归遍历目录的方法将一个目录下所有的文件进行打包。然后我们提供java打包路径就可以得到File对象!
最后在在主程序里添加代码:
File jarFile = EJar.createTempJar("bin"); EJar.addClasspath("/usr/lib/hadoop-0.20/conf"); ClassLoader classLoader = EJar.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader);
介绍一下addClasspath方法:
public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) {} } }
在这里我们创建一个class路径,以上方法可以参考一下java的源码;
然后设置jar文件
((JobConf) job.getConfiguration()).setJar(jarFile.toString());这样就可以实现打包了,提交任务我们发现没有报jar File not found的错误了,并且提交任务成功!
<!--EndFragment-->