Hbase通过 Mapreduce 写入数据到Mysql
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.96.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.96.2-hadoop2</version> </dependency> </dependencies>
package com.abloz.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableInputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @SuppressWarnings("deprecation") public class CopyToMysql extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(CopyToMysql.class); public static final String driverClassName = "com.mysql.jdbc.Driver"; public static final String URL = "jdbc:mysql://Hadoop48/toplists"; public static final String USERNAME = "root";//mysql username public static final String PASSWORD = "";//mysql password private static final String tableName="myaward"; private Connection connection; public static class AwardInfoRecord implements Writable, DBWritable { String userid; String nick; String loginid; public AwardInfoRecord() { } public void readFields(DataInput in) throws IOException { this.userid = Text.readString(in); this.nick = Text.readString(in); this.loginid = Text.readString(in); } public void write(DataOutput out) throws IOException { Text.writeString(out,this.userid); Text.writeString(out, this.nick); Text.writeString(out, this.loginid); } public void readFields(ResultSet result) throws SQLException { this.userid = result.getString(1); this.nick = result.getString(2); this.loginid = result.getString(3); } public void write(PreparedStatement stmt) throws SQLException { stmt.setString(1, this.userid); stmt.setString(2, this.nick); stmt.setString(3, this.loginid); } public String toString() { return new String(this.userid + " " + this.nick +" " +this.loginid); } } public static Configuration conf; public static class MyMapper extends MapReduceBase implements Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable> { @Override public void map(ImmutableBytesWritable key, Result rs, OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> output, Reporter report) throws IOException { String rowkey = new String(key.get()); String userid = new String(rs.getValue("info".getBytes(), "UserId".getBytes())); String nick = new String(rs.getValue("info".getBytes(), "nickName".getBytes()),HConstants.UTF8_ENCODING); String loginid = new String(rs.getValue("info".getBytes(), "loginId".getBytes())); output.collect(new ImmutableBytesWritable(userid.getBytes()),new ImmutableBytesWritable((nick+","+loginid).getBytes())); //LOG.info("map: userid:"+userid+",nick:"+nick); } @Override public void configure(JobConf job) { super.configure(job); } } public static class MyReducer extends MapReduceBase implements Reducer<ImmutableBytesWritable, ImmutableBytesWritable,AwardInfoRecord, Text>{ @Override public void configure(JobConf job) { super.configure(job); } @Override public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> it, OutputCollector<AwardInfoRecord, Text> output, Reporter report) throws IOException { AwardInfoRecord record = new AwardInfoRecord(); record.userid=new String(key.get()); String info = new String(it.next().get()); record.nick = new String(info.split(",")[0]); record.loginid = new String(info.split(",")[1]); //LOG.debug("reduce: userid:"+record.userid+",nick:"+record.nick); output.collect(record, new Text()); } } public static void main(String[] args) throws Exception { conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new CopyToMysql(), args); System.exit(ret); } @Override public int run(String[] args) throws Exception { createConnection(driverClassName, URL); JobControl control = new JobControl("mysql"); JobConf job = new JobConf(conf,CopyToMysql.class); job.setJarByClass(CopyToMysql.class); String fromTable = "award"; job.set("mapred.input.dir", fromTable); job.set("hbase.mapred.tablecolumns", "info:UserId info:nickName info:loginId"); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setInputFormat(TableInputFormat.class); DBConfiguration.configureDB(job, driverClassName, URL, USERNAME, PASSWORD); String[] fields = {"userid","nick","loginid"}; DBOutputFormat.setOutput(job, tableName, fields); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); Job controlJob = new Job(job); control.addJob(controlJob); //JobClient.runJob(job); //control.run(); Thread theController = new Thread(control); theController.start(); //final while(!control.allFinished()){ Thread.sleep(3000); System.out.print("."); } control.stop(); System.out.println(); LOG.info("job end!"); return 0; } //connect private void createConnection(String driverClassName, String url) throws Exception { Class.forName(driverClassName); connection = DriverManager.getConnection(url,USERNAME,PASSWORD); connection.setAutoCommit(false); } //create table fast private void createTable(String tableName) throws SQLException { String createTable = "CREATE TABLE " +tableName+ " (userid VARCHAR(9) NOT NULL," + " nick VARCHAR(20) NOT NULL, " + " loginid VARCHAR(20) NOT NULL, " + " PRIMARY KEY (userid, caldate))"; Statement st = connection.createStatement(); try { st.executeUpdate(createTable); connection.commit(); } catch (Exception e) { LOG.warn("table '"+tableName+"' is already exist! so we do anything"); } finally { st.close(); } } //init // private void initialize() throws Exception { // if(!this.initialized) { // createConnection(driverClassName, URL); //// dropTables(tableName); // createTable(tableName); // System.out.println("------------------create ----------------------"); // this.initialized = true; // } // } }
更简单的代码:
package com.my.hbase; /** * Created by foreverhui on 2015/1/16. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class FromHBaseToMysqlExample { public static class HBaseMapper extends TableMapper<ImmutableBytesWritable, Text>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for(Cell kv:value.rawCells()){ //Text out=new Text(Bytes.toString(kv.getFamilyArray())+"|"+Bytes.toString(kv.getQualifierArray())+"|"+Bytes.toString(kv.getValueArray())); String primaryKey=Bytes.toString(kv.getRowArray()); String dataRow=Bytes.toString(kv.getValueArray()); //todo //解析 dataRow insert into mysql //context.write(new ImmutableBytesWritable(kv.getRowArray()), out); } } } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf=HBaseConfiguration.create(); conf.set("from.table", "testtable"); //conf.set("family", "family1"); Job job=new Job(conf,"hbase to hbase"); job.setJarByClass(FromHBaseToMysqlExample.class); TableMapReduceUtil.initTableMapperJob(conf.get("from.table"), new Scan(), HBaseMapper.class,ImmutableBytesWritable.class, Text.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
相关推荐
minerd 2020-10-28
Kafka 2020-09-18
Wepe0 2020-10-30
杜倩 2020-10-29
windle 2020-10-29
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01