Hadoop中如何正确编写继承自Writable接口的子类

Hadoop中可以编写自己的类,用作hadoop job的key或者value类型,自己编写的类要实现接口Writable。

我编写了一个HttpContent类,主要用于保存爬取网页的源码,返回状态和编码格式信息,他在mapper中别实例化保存网页内容,然后传输到reducer中被使用,在编写中遇到了一些问题:

(1)首先是没有编写默认的构造函数类,因为java中的反馈机制需要一个参数为空的默认构造函数,如果没有这个类就不能利用反馈机制实例化这个类。

(2)然后是类型在序列化的时候写入后读取值不正确,一定要统一类型中write(DataOutput out)和readFields(DataInput in)中写入和读取参数的方法,例如一个int类型如果你在write()中使用writeInt写出,在readFields()中就应该使用readInt()读入,否则读取的值是不正确的。多个值写出读入的时候,写出读入的顺序要保持一致的,否则读取也是不正确的。

(3)Writable中用于写出的DataOutput类型没有针对String类型的序列化方法,需要先将String类型转换成为Byte数组类型,然后在进行序列化。

下面是HttpContent的类型的源码,重点是write(DataOutput out)和readFields(DataInput in)方法:

package bbs.http;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.Writable;

/**
 *
 */
public class HttpContent implements Writable{

 
 private byte[] content;
 
 private int status;
 
 private String encoding;

 public HttpContent(){
 }
 
 public HttpContent(byte[] content, int status, String encoding) {
  this.content = content;
  this.status = status;
  if (encoding == null)
   this.encoding = "GBK";
  else
   this.encoding = encoding;

 }

 public byte[] getByte() {
  return this.content;
 }

 public String getData() {
  String data =null;
  try {
   if(content==null){
    //System.out.println("content涓簄ull:"+getStatus());
   
    return null;
   }
   data = new String(content, encoding);
   
  } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
  }
  return data;
 }

 public int getStatus() {
  return this.status;
 }

 public String getEncoding() {
  return encoding;
 }

 @Override
 public String toString() {
  return "Content: " + getData() + "\n" + "status: " + status + "\n"
    + "encoding: " + encoding + "\n";
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  int size=in.readInt();
//  System.out.println("HttpContent readFields: size="+size);
  if(size>0){
   content=new byte[size];
   in.readFully(this.content);
  }else{
   content=null;
  }
 
  this.status=in.readInt();
//  System.out.println("status="+status);
  int encodeSize=in.readInt();
  byte[] encodeBytes=new byte[encodeSize];
  in.readFully(encodeBytes);
  this.encoding=new String(encodeBytes,"GBK");
//  System.out.println("encoding="+this.encoding);
 
 }

 @Override
 public void write(DataOutput out) throws IOException {
  if(content==null){
   out.writeInt(0);
  }else{
//   System.out.println("HttpContent write(): content.length="+content.length);
   out.writeInt(content.length);
   out.write(content);
  }
//  System.out.println("Status="+status);
  out.writeInt(status);
//  System.out.println("encoding="+encoding);
  byte[] temp=encoding.getBytes("GBK");
  out.writeInt(temp.length);
  out.write(temp);
//  out.writeBytes(encoding);
 
 }
}

相关推荐