Hadoop中DBInputFormat和DBOutputFormat使用

一、背景

为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

推荐阅读:

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:

CREATE TABLE studentinfo (

  id INTEGER NOT NULL PRIMARY KEY,

  name VARCHAR(32) NOT NULL);2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。3、DBInputFormat用法如下:

public class DBInput {
  // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  // CREATE TABLE studentinfo (
  // id INTEGER NOT NULL PRIMARY KEY,
  // name VARCHAR(32) NOT NULL);

  public static class StudentinfoRecord implements Writable, DBWritable {
    int id;
    String name;
    public StudentinfoRecord() {

    }
    public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
    }
    public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
    }
    public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
    }
    public String toString() {
        return new String(this.id + " " + this.name);
    }
  }
  public class DBInputMapper extends MapReduceBase implements
        Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
    public void map(LongWritable key, StudentinfoRecord value,
          OutputCollector<LongWritable, Text> collector, Reporter reporter)
          throws IOException {
        collector.collect(new LongWritable(value.id), new Text(value
            .toString()));
    }
  }
  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(DBInput.class);
    DistributedCache.addFileToClassPath(new Path(
          "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
   
    conf.setMapperClass(DBInputMapper.class);
    conf.setReducerClass(IdentityReducer.class);

    conf.setMapOutputKeyClass(LongWritable.class);
    conf.setMapOutputValueClass(Text.class);
    conf.setOutputKeyClass(LongWritable.class);
    conf.setOutputValueClass(Text.class);
   
    conf.setInputFormat(DBInputFormat.class);
    FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
    DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
    String[] fields = { "id", "name" };
    DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
 null, "id", fields);

    JobClient.runJob(conf);
  }
}

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:

 public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
    }

实现DBWritable的方法:

public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
    }
    public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
    }

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
    String[] fields = { "id", "name" };
    DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",  null, "id", fields);

相关推荐