MapReduce数据清
æ Âç¾ï¼Â                          Â
说æÂÂï¼ÂæÂ°æÂ®æ¸Âæ´ÂçÂÂè¿Âç¨Âå¾Âå¾ÂåªéÂÂè¦Âè¿Âè¡ÂMapperç¨ÂåºÂï¼Âä¸ÂéÂÂè¦Âè¿Âè¡ÂReduceç¨ÂåºÂãÂÂ
å·²éÂÂéÂÂå°æÂ¥å¿ÂæÂ°æÂ®åÂÂÃ¥ÂÂ¥web.logæÂÂä»¶ä¸Âï¼Âå¶ä¸Âä¸ÂæÂ¡æÂ¥å¿Âæ ¼å¼Âå¦Âä¸Âï¼Â
101.206.68.147 - - [18/Sep/2018:20:05:16 +0000] "HEAD / HTTP/1.2" 200 20 "-" "DNSPod-Monitor/1.0"
æ¸Âæ´Âç®æ Âï¼Âæ¸Âé¤æÂ¥å¿Âä¸ÂÃ¥ÂÂ段é¿度æ¯Â11å°ÂçÂÂæÂ¥å¿Âè®°å½ÂãÂÂ
å·ä½Â代ç Âå¦Âä¸Âï¼Â
项ç®1æÂ°æÂ®æ¸Âæ´Âä¸Â
æÂ°å»ºåÂÂcom.scitc.clean
1.ç¼ÂÃ¥ÂÂLogMapperç±»ï¼Â
package com.scitc.clean;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
Â
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     // 1 è·åÂÂ1è¡ÂæÂ°æÂ®
     String line = value.toString();
     // 2 è§£æÂÂæÂ¥å¿Â
     boolean result = parseLog(line,context);
     // 3 æÂ¥å¿Âä¸ÂÃ¥ÂÂæ³ÂéÂÂåº
     if (!result) {
        return;
     }
     // 4 设置key
     k.set(line);
     // 5 Ã¥ÂÂåºæÂ°æÂ®
     context.write(k, NullWritable.get());
  }
  /**
   * Ã¥ÂÂè½ï¼Âè§£æÂÂæÂ¥å¿Â
   * @param line æÂ¥å¿ÂÃ¥ÂÂ容
   * @param context ä¸Âä¸ÂæÂÂ对象
   * @return
   */
  private boolean parseLog(String line, Context context) {
     // 1 æÂªåÂÂ
     String[] fields = line.split(" ");
     // 2 æÂ¥å¿Âé¿度大äºÂ11çÂÂ为åÂÂæ³Â
     if (fields.length > 11) {
        // ç³»ç»Â计æÂ°å¨
        context.getCounter("map", "true").increment(1);
        return true;
     }else {
        context.getCounter("map", "false").increment(1);
        return false;
     }
  }
}
2.ç¼ÂÃ¥ÂÂLogDriverç±»
package com.scitc.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Â
public class LogDriver {
  public static void main(String[] args) throws Exception {
     //设置è¾Âå¥è¾Âåº路å¾Â设置
       args = new String[] { "E:/hadoopå¼ÂÃ¥ÂÂæÂÂä»¶/input", "E:/hadoopå¼ÂÃ¥ÂÂæÂÂä»¶/output" };
     //1 è·åÂÂjobä¿¡æÂ¯
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     //2 å 载jarÃ¥ÂÂ
     job.setJarByClass(LogDriver.class);
     //3 å³èÂÂmap
     job.setMapperClass(LogMapper.class);
     //4 设置æÂÂç»Âè¾Âåº类åÂÂ
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(NullWritable.class);
     //设置reducetask个æÂ°ä¸º0
     job.setNumReduceTasks(0);
     // 5 设置è¾Âå¥åÂÂè¾Âåº路å¾Â
     FileInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     //6 æÂÂ交job
     job.waitForCompletion(true);
  }  }
æÂŒÂ°æµÂè¯Âï¼Â
å³é®LogDriverç±»à run asà java application
å³å¯å¨è¾Âåºç®å½ÂæÂ¥çÂÂå°æ¸Âæ´ÂÃ¥ÂÂçÂÂæÂ°æÂ®ãÂÂ
ä¹Âå¯以æÂÂÃ¥ÂÂãÂÂä¸Âä¼ ãÂÂå¨éÂÂ群ä¸Âè¿Âè¡Âï¼Âä½Â注æÂÂä¿®æÂ¹è¾Âå¥ãÂÂè¾Âåº路å¾ÂãÂÂ
Â
项ç®2æÂ°æÂ®æ¸Âæ´ÂäºÂ
éÂÂè¿Âèªå®Âä¹ÂçÂÂBean对象å°Âè£Âæ¸Âæ´ÂÃ¥ÂÂçÂÂæÂ¥å¿ÂæÂ°æÂ®ãÂÂ
1.ç¼ÂÃ¥ÂÂUpLogBeanç±»
package com.scitc.clean;
public class UpLogBean {
  private String remote_addr;// è®°å½Â客æÂ·ç«¯çÂÂipå°åÂÂ
  private String remote_user;// è®°å½Â客æÂ·ç«¯ç¨æÂ·åÂÂç§°,忽çÂ¥å±ÂæÂ§"-"
  private String time_local;// è®°å½Â访é®æÂ¶é´ä¸ÂæÂ¶åº
  private String request;// è®°å½Â请æ±ÂçÂÂurlä¸ÂhttpÃ¥ÂÂè®®
  private String status;// è®°å½Â请æ±Âç¶æÂÂï¼ÂæÂÂÃ¥ÂÂæÂ¯200
  private String body_bytes_sent;// è®°å½ÂÃ¥ÂÂéÂÂç»Â客æÂ·ç«¯æÂÂ件主ä½ÂÃ¥ÂÂ容大å°Â
  private String http_referer;// ç¨æÂ¥è®°å½Âä»Âé£个页é¢é¾æÂ¥è®¿é®è¿ÂæÂ¥çÂÂ
  private String http_user_agent;// è®°å½Â客æÂ·æµÂè§Âå¨çÂÂç¸å³信æÂ¯
Â
  private boolean valid = true;// 夿ÂÂæÂ°æÂ®æÂ¯å¦åÂÂæ³Â
  public String getRemote_addr() {
     return remote_addr;
  }
  public void setRemote_addr(String remote_addr) {
     this.remote_addr = remote_addr;
  }
  public String getRemote_user() {
     return remote_user;
  }
  public void setRemote_user(String remote_user) {
     this.remote_user = remote_user;
  }
  public String getTime_local() {
     return time_local;
  }
  public void setTime_local(String time_local) {
     this.time_local = time_local;
  }
  public String getRequest() {
     return request;
  }
  public void setRequest(String request) {
     this.request = request;
  }
  public String getStatus() {
     return status;
  }
  public void setStatus(String status) {
     this.status = status;
  }
  public String getBody_bytes_sent() {
     return body_bytes_sent;
  }
  public void setBody_bytes_sent(String body_bytes_sent) {
     this.body_bytes_sent = body_bytes_sent;
  }
  public String getHttp_referer() {
     return http_referer;
  }
  public void setHttp_referer(String http_referer) {
     this.http_referer = http_referer;
  }
  public String getHttp_user_agent() {
     return http_user_agent;
  }
  public void setHttp_user_agent(String http_user_agent) {
     this.http_user_agent = http_user_agent;
  }
  public boolean isValid() {
     return valid;
  }
  public void setValid(boolean valid) {
     this.valid = valid;
  }
  @Override
  public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append(this.valid);
     sb.append("\001").append(this.remote_addr);
     sb.append("\001").append(this.remote_user);
     sb.append("\001").append(this.time_local);
     sb.append("\001").append(this.request);
     sb.append("\001").append(this.status);
     sb.append("\001").append(this.body_bytes_sent);
     sb.append("\001").append(this.http_referer);
     sb.append("\001").append(this.http_user_agent);
     return sb.toString();
  }  }
2.ç¼ÂÃ¥ÂÂUpLogMapperç±»
package com.scitc.clean;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
Â
public class UpLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     // 1 è·åÂÂ1è¡Â
     String line = value.toString();
     // 2 è§£æÂÂæÂ¥å¿ÂæÂ¯å¦åÂÂæ³Â
     UpLogBean bean = parseLog(line);
     if (!bean.isValid()) {
        return;
     }   Â
     k.set(bean.toString());   Â
     // 3 è¾Âåº
     context.write(k, NullWritable.get());
  }
  // è§£æÂÂæÂ¥å¿Â
  private UpLogBean parseLog(String line) {
     UpLogBean logBean = new UpLogBean();
     // 1 æÂªåÂÂ
     String[] fields = line.split(" ");
     if (fields.length > 11) {
        // 2å°Âè£ÂæÂ°æÂ®
        logBean.setRemote_addr(fields[0]);
        logBean.setRemote_user(fields[1]);
        logBean.setTime_local(fields[3].substring(1));
        logBean.setRequest(fields[6]);
        logBean.setStatus(fields[8]);
        logBean.setBody_bytes_sent(fields[9]);
        logBean.setHttp_referer(fields[10]);
       Â
        if (fields.length > 12) {
           logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
        }else {
           logBean.setHttp_user_agent(fields[11]);
        }
        //大äºÂ400ï¼ÂHTTPéÂÂ误
        if (Integer.parseInt(logBean.getStatus()) >= 400) {
           logBean.setValid(false);
        }
     }else {
        logBean.setValid(false);
     }
     return logBean;
  }  }
3.ç¼ÂÃ¥ÂÂUpLogDriverç±»
package com.scitc.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Â
public class UpLogDriver {
  public static void main(String[] args) throws Exception {
     args = new String[] { "E:/hadoopå¼ÂÃ¥ÂÂæÂÂä»¶/input", "E:/hadoopå¼ÂÃ¥ÂÂæÂÂä»¶/upoutput" };
            // 1 è·åÂÂjobä¿¡æÂ¯
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);
           // 2 å 载jarÃ¥ÂÂ
           job.setJarByClass(UpLogDriver.class);
           // 3 å³èÂÂmap
           job.setMapperClass(UpLogMapper.class);
           // 4 设置æÂÂç»Âè¾Âåº类åÂÂ
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(NullWritable.class);
           // 5 设置è¾Âå¥åÂÂè¾Âåº路å¾Â
           FileInputFormat.setInputPaths(job, new Path(args[0]));
           FileOutputFormat.setOutputPath(job, new Path(args[1]));
           // 6 æÂÂ交
           job.waitForCompletion(true);
        }  }Â
æÂŒÂ°æµÂè¯Âï¼Â
å³é®UpLogDriverç±»à run asà java application
å³å¯å¨è¾Âåºç®å½ÂæÂ¥çÂÂå°æ¸Âæ´ÂÃ¥ÂÂçÂÂæÂ°æÂ®ãÂÂ
ä¹Âå¯以æÂÂÃ¥ÂÂãÂÂä¸Âä¼ ãÂÂå¨éÂÂ群ä¸Âè¿Âè¡Âï¼Âä½Â注æÂÂä¿®æÂ¹è¾Âå¥ãÂÂè¾Âåº路å¾ÂãÂÂ
æ Âç¾ï¼Â                          Â
Ã¥ÂÂæÂÂå°åÂÂï¼Âhttps://www.cnblogs.com/hemomo/p/12955961.html