MapReduce数据清
æ Âç¾ï¼Â                          Â
说æÂÂï¼Âæ°æ®æ¸Âæ´ÂçÂÂè¿Âç¨Âå¾Âå¾ÂåªéÂÂè¦Âè¿Âè¡ÂMapperç¨ÂåºÂï¼Âä¸ÂéÂÂè¦Âè¿Âè¡ÂReduceç¨ÂåºÂãÂÂ
å·²éÂÂéÂÂå°æÂ¥å¿Âæ°æ®åÂÂÃ¥ÂÂ¥web.logæÂÂ件ä¸Âï¼Âå¶ä¸Âä¸Âæ¡æÂ¥å¿Âæ ¼å¼Âå¦Âä¸Âï¼Â
101.206.68.147 - - [18/Sep/2018:20:05:16 +0000] "HEAD / HTTP/1.2" 200 20 "-" "DNSPod-Monitor/1.0"
æ¸Âæ´Âç®æ Âï¼Âæ¸Âé¤æÂ¥å¿Âä¸ÂÃ¥ÂÂ段é¿度æ¯Â11å°ÂçÂÂæÂ¥å¿Âè®°å½ÂãÂÂ
å·ä½Â代ç Âå¦Âä¸Âï¼Â
项ç®1æ°æ®æ¸Âæ´Âä¸Â
æ°建åÂÂcom.scitc.clean
1.ç¼ÂÃ¥ÂÂLogMapperç±»ï¼Â
package com.scitc.clean;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
Â
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     // 1 è·åÂÂ1è¡Âæ°æ®
     String line = value.toString();
     // 2 解æÂÂæÂ¥å¿Â
     boolean result = parseLog(line,context);
     // 3 æÂ¥å¿Âä¸ÂÃ¥ÂÂæ³ÂéÂÂåº
     if (!result) {
        return;
     }
     // 4 设置key
     k.set(line);
     // 5 Ã¥ÂÂåºæ°æ®
     context.write(k, NullWritable.get());
  }
  /**
   * Ã¥ÂÂè½ï¼Â解æÂÂæÂ¥å¿Â
   * @param line æÂ¥å¿ÂÃ¥ÂÂ容
   * @param context ä¸Âä¸ÂæÂÂ对象
   * @return
   */
  private boolean parseLog(String line, Context context) {
     // 1 æªåÂÂ
     String[] fields = line.split(" ");
     // 2 æÂ¥å¿Âé¿度大äºÂ11çÂÂ为åÂÂæ³Â
     if (fields.length > 11) {
        // ç³»ç»Â计æ°å¨
        context.getCounter("map", "true").increment(1);
        return true;
     }else {
        context.getCounter("map", "false").increment(1);
        return false;
     }
  }
}
2.ç¼ÂÃ¥ÂÂLogDriverç±»
package com.scitc.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Â
public class LogDriver {
  public static void main(String[] args) throws Exception {
     //设置è¾Âå¥è¾Âåº路å¾Â设置
       args = new String[] { "E:/hadoopå¼ÂÃ¥ÂÂæÂÂ件/input", "E:/hadoopå¼ÂÃ¥ÂÂæÂÂ件/output" };
     //1 è·åÂÂjobä¿¡æ¯
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     //2 å 载jarÃ¥ÂÂ
     job.setJarByClass(LogDriver.class);
     //3 å³èÂÂmap
     job.setMapperClass(LogMapper.class);
     //4 设置æÂÂç»Âè¾Âåº类åÂÂ
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(NullWritable.class);
     //设置reducetask个æ°为0
     job.setNumReduceTasks(0);
     // 5 设置è¾Âå¥åÂÂè¾Âåº路å¾Â
     FileInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     //6 æÂÂ交job
     job.waitForCompletion(true);
  }  }
æΡæµÂè¯Âï¼Â
å³é®LogDriverç±»à run asà java application
å³å¯å¨è¾Âåºç®å½ÂæÂ¥çÂÂå°æ¸Âæ´ÂÃ¥ÂÂçÂÂæ°æ®ãÂÂ
ä¹Âå¯以æÂÂÃ¥ÂÂãÂÂä¸Âä¼ ãÂÂå¨éÂÂ群ä¸Âè¿Âè¡Âï¼Âä½Â注æÂÂä¿®æ¹è¾Âå¥ãÂÂè¾Âåº路å¾ÂãÂÂ
Â
项ç®2æ°æ®æ¸Âæ´ÂäºÂ
éÂÂè¿Âèªå®Âä¹ÂçÂÂBean对象å°Âè£Âæ¸Âæ´ÂÃ¥ÂÂçÂÂæÂ¥å¿Âæ°æ®ãÂÂ
1.ç¼ÂÃ¥ÂÂUpLogBeanç±»
package com.scitc.clean;
public class UpLogBean {
  private String remote_addr;// è®°å½Â客æ·端çÂÂipå°åÂÂ
  private String remote_user;// è®°å½Â客æ·端ç¨æ·åÂÂ称,忽çÂ¥å±Âæ§"-"
  private String time_local;// è®°å½Â访é®æ¶é´ä¸Âæ¶åº
  private String request;// è®°å½Â请æ±ÂçÂÂurlä¸ÂhttpÃ¥ÂÂè®®
  private String status;// è®°å½Â请æ±Âç¶æÂÂï¼ÂæÂÂÃ¥ÂÂæ¯200
  private String body_bytes_sent;// è®°å½ÂÃ¥ÂÂéÂÂç»Â客æ·端æÂÂ件主ä½ÂÃ¥ÂÂ容大å°Â
  private String http_referer;// ç¨æ¥记å½Âä»Âé£个页é¢é¾æ¥访é®è¿ÂæÂ¥çÂÂ
  private String http_user_agent;// è®°å½Â客æ·æµÂè§Âå¨çÂÂç¸å³信æ¯
Â
  private boolean valid = true;// å¤æÂÂæ°æ®æ¯å¦åÂÂæ³Â
  public String getRemote_addr() {
     return remote_addr;
  }
  public void setRemote_addr(String remote_addr) {
     this.remote_addr = remote_addr;
  }
  public String getRemote_user() {
     return remote_user;
  }
  public void setRemote_user(String remote_user) {
     this.remote_user = remote_user;
  }
  public String getTime_local() {
     return time_local;
  }
  public void setTime_local(String time_local) {
     this.time_local = time_local;
  }
  public String getRequest() {
     return request;
  }
  public void setRequest(String request) {
     this.request = request;
  }
  public String getStatus() {
     return status;
  }
  public void setStatus(String status) {
     this.status = status;
  }
  public String getBody_bytes_sent() {
     return body_bytes_sent;
  }
  public void setBody_bytes_sent(String body_bytes_sent) {
     this.body_bytes_sent = body_bytes_sent;
  }
  public String getHttp_referer() {
     return http_referer;
  }
  public void setHttp_referer(String http_referer) {
     this.http_referer = http_referer;
  }
  public String getHttp_user_agent() {
     return http_user_agent;
  }
  public void setHttp_user_agent(String http_user_agent) {
     this.http_user_agent = http_user_agent;
  }
  public boolean isValid() {
     return valid;
  }
  public void setValid(boolean valid) {
     this.valid = valid;
  }
  @Override
  public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append(this.valid);
     sb.append("\001").append(this.remote_addr);
     sb.append("\001").append(this.remote_user);
     sb.append("\001").append(this.time_local);
     sb.append("\001").append(this.request);
     sb.append("\001").append(this.status);
     sb.append("\001").append(this.body_bytes_sent);
     sb.append("\001").append(this.http_referer);
     sb.append("\001").append(this.http_user_agent);
     return sb.toString();
  }  }
2.ç¼ÂÃ¥ÂÂUpLogMapperç±»
package com.scitc.clean;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
Â
public class UpLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     // 1 è·åÂÂ1è¡Â
     String line = value.toString();
     // 2 解æÂÂæÂ¥å¿Âæ¯å¦åÂÂæ³Â
     UpLogBean bean = parseLog(line);
     if (!bean.isValid()) {
        return;
     }   Â
     k.set(bean.toString());   Â
     // 3 è¾Âåº
     context.write(k, NullWritable.get());
  }
  // 解æÂÂæÂ¥å¿Â
  private UpLogBean parseLog(String line) {
     UpLogBean logBean = new UpLogBean();
     // 1 æªåÂÂ
     String[] fields = line.split(" ");
     if (fields.length > 11) {
        // 2å°Âè£Âæ°æ®
        logBean.setRemote_addr(fields[0]);
        logBean.setRemote_user(fields[1]);
        logBean.setTime_local(fields[3].substring(1));
        logBean.setRequest(fields[6]);
        logBean.setStatus(fields[8]);
        logBean.setBody_bytes_sent(fields[9]);
        logBean.setHttp_referer(fields[10]);
       Â
        if (fields.length > 12) {
           logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
        }else {
           logBean.setHttp_user_agent(fields[11]);
        }
        //大äºÂ400ï¼ÂHTTPéÂÂ误
        if (Integer.parseInt(logBean.getStatus()) >= 400) {
           logBean.setValid(false);
        }
     }else {
        logBean.setValid(false);
     }
     return logBean;
  }  }
3.ç¼ÂÃ¥ÂÂUpLogDriverç±»
package com.scitc.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Â
public class UpLogDriver {
  public static void main(String[] args) throws Exception {
     args = new String[] { "E:/hadoopå¼ÂÃ¥ÂÂæÂÂ件/input", "E:/hadoopå¼ÂÃ¥ÂÂæÂÂ件/upoutput" };
            // 1 è·åÂÂjobä¿¡æ¯
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);
           // 2 å 载jarÃ¥ÂÂ
           job.setJarByClass(UpLogDriver.class);
           // 3 å³èÂÂmap
           job.setMapperClass(UpLogMapper.class);
           // 4 设置æÂÂç»Âè¾Âåº类åÂÂ
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(NullWritable.class);
           // 5 设置è¾Âå¥åÂÂè¾Âåº路å¾Â
           FileInputFormat.setInputPaths(job, new Path(args[0]));
           FileOutputFormat.setOutputPath(job, new Path(args[1]));
           // 6 æÂÂ交
           job.waitForCompletion(true);
        }  }Â
æΡæµÂè¯Âï¼Â
å³é®UpLogDriverç±»à run asà java application
å³å¯å¨è¾Âåºç®å½ÂæÂ¥çÂÂå°æ¸Âæ´ÂÃ¥ÂÂçÂÂæ°æ®ãÂÂ
ä¹Âå¯以æÂÂÃ¥ÂÂãÂÂä¸Âä¼ ãÂÂå¨éÂÂ群ä¸Âè¿Âè¡Âï¼Âä½Â注æÂÂä¿®æ¹è¾Âå¥ãÂÂè¾Âåº路å¾ÂãÂÂ
æ Âç¾ï¼Â                          Â
Ã¥ÂÂæÂÂå°åÂÂï¼Âhttps://www.cnblogs.com/hemomo/p/12955961.html