大数据系列11:Gora – 大数据持久化

borm – 大数据的对象持久化

wget  http://archive.apache.org/dist/gora/0.3/apache-gora-0.3-src.zip

unzip apache-gora-0.3-src.zip

cd apache-gora-0.3

mvn clean package

1、创建项目

mvn archetype:create -DgroupId=org.apdplat.demo.gora -DartifactId=gora-demo

2、增加依赖

vi gora-demo/pom.xml

在<dependencies>标签内增加:

       <dependency>

              <groupId>org.apache.hadoop</groupId>

              <artifactId>hadoop-core</artifactId>

              <version>1.2.1</version>

       </dependency>

       <dependency>

              <groupId>org.apache.hbase</groupId>

              <artifactId>hbase</artifactId>

              <version>0.94.12</version>

       </dependency>

       <dependency>

              <groupId>org.apache.gora</groupId>

              <artifactId>gora-core</artifactId>

              <version>0.3</version>

                     <exclusions>

                                   <exclusion>

                                                 <groupId>org.apache.hadoop</groupId>

                                                 <artifactId>hadoop-core</artifactId>

                                   </exclusion>

                                   <exclusion>

                                                 <groupId>org.apache.cxf</groupId>

                                                 <artifactId>cxf-rt-frontend-jaxrs</artifactId>

                                   </exclusion>

                     </exclusions>

       </dependency>

       <dependency>

              <groupId>org.apache.gora</groupId>

              <artifactId>gora-hbase</artifactId>

              <version>0.3</version>

                     <exclusions>

                                   <exclusion>

                                                 <groupId>org.apache.hbase</groupId>

                                                 <artifactId>hbase</artifactId>

                                   </exclusion>

                                   <exclusion>

                                                 <groupId>org.apache.hadoop</groupId>

                                                 <artifactId>hadoop-test</artifactId>

                                   </exclusion>

                     </exclusions>

       </dependency>

3、数据建模

mkdir -p gora-demo/src/main/avro

vi gora-demo/src/main/avro/person.json

输入:

      {

        "type": "record",

        "name": "Person",

        "namespace":"org.apdplat.demo.gora.generated",

        "fields" : [

             {"name":"idcard", "type": "string"},

             {"name":"name", "type": "string"},

             {"name":"age", "type": "string"}

        ]

      }

4、生成JAVA

bin/gora  goracompiler  gora-demo/src/main/avro/person.json  gora-demo/src/main/java/

5、模型映射

mkdir -p gora-demo/src/main/resources/

vi gora-demo/src/main/resources/gora-hbase-mapping.xml

输入:

      <gora-orm>

        <table name="Person">

             <familyname="basic"/>

             <familyname="detail"/>

        </table>

        <class table="Person"name="org.apdplat.demo.gora.generated.Person"keyclass="java.lang.String">

         <field name="idcard"family="basic" qualifier="idcard"/>

         <field name="name"family="basic" qualifier="name"/>

         <field name="age"family="detail" qualifier="age"/>

        </class>

      </gora-orm>

6Gora配置

vi gora-demo/src/main/resources/gora.properties

输入:

      gora.datastore.default=org.apache.gora.hbase.store.HBaseStore

      gora.datastore.autocreateschema=true

7Hbase配置

vi gora-demo/src/main/resources/hbase-site.xml

输入:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl"href="configuration.xsl"?>

<configuration>

 <property>

   <name>hbase.zookeeper.property.clientPort</name>

   <value>2181</value>

 </property>

 <property>

   <name>hbase.zookeeper.quorum</name>

   <value>host001</value>

 </property>

</configuration>

8、编写PersonManager.javaPersonAnalytics.java

vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonManager.java

    输入:

package org.apdplat.demo.gora;

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.text.ParseException;

import org.apache.avro.util.Utf8;

import org.apache.gora.query.Query;

import org.apache.gora.query.Result;

import org.apache.gora.store.DataStore;

import org.apache.gora.store.DataStoreFactory;

import org.apache.hadoop.conf.Configuration;

import org.apdplat.demo.gora.generated.Person;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

publicclass PersonManager {

   privatestaticfinal Logger log = LoggerFactory.getLogger(PersonManager.class);    

   private DataStore<String, Person> dataStore;   

   public PersonManager() {

     try{

        init();

     } catch(IOException ex) {

        thrownew RuntimeException(ex);

     }

   }

   privatevoid init() throws IOException {

      Configuration  conf = new Configuration();

     dataStore= DataStoreFactory.getDataStore(String.class, Person.class, conf);

   }

   privatevoid parse(String input) throws IOException,ParseException, Exception {

     log.info("解析文件:" + input);

     BufferedReader reader = new BufferedReader(new FileReader(input));

     longlineCount = 0;

     try{

        String line = reader.readLine();

        do {

          Person person = parseLine(line);

         

          if(person != null) {

            //入库

           storePerson(person.getIdcard().toString(), person);

          }

          lineCount++;

          line = reader.readLine();

        } while(line != null);

       

     } finally{

        reader.close(); 

     }

     log.info("文件解析完毕. 总人数:" + lineCount);

   }

   private Person parseLine(String line) throws ParseException {

            String[] attrs = line.split(" ");

        String idcard = attrs[0];

        String name = attrs[1];

     String age = attrs[2];

     

     Person person = new Person();

     person.setIdcard(new Utf8(idcard));

     person.setName(new Utf8(name));

     person.setAge(new Utf8(age));

     

     return person;

   }

   privatevoid storePerson(String key,Person person) throwsIOException, Exception {

          log.info("保存人员信息: " + person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());

     dataStore.put(key,person);

   }

   privatevoid get(String key) throws IOException, Exception{

     Person person = dataStore.get(key);

     printPerson(person);

   }

   privatevoid query(String key) throws IOException, Exception{

     Query<String, Person> query = dataStore.newQuery();

     query.setKey(key);

     

     Result<String, Person> result = query.execute();

     

     printResult(result);

   }

   privatevoid query(String startKey,String endKey) throwsIOException, Exception {

     Query<String, Person> query = dataStore.newQuery();

     query.setStartKey(startKey);

     query.setEndKey(endKey);

     

     Result<String, Person> result = query.execute();

     

     printResult(result);

   }

   privatevoid delete(String key) throws Exception {

     dataStore.delete(key);

     dataStore.flush();

     log.info("身份证号码为:" + key + " 的人员信息被删除");

   }

   privatevoid deleteByQuery(StringstartKey, String endKey) throws IOException, Exception {

     Query<String, Person> query = dataStore.newQuery();

     query.setStartKey(startKey);

     query.setEndKey(endKey);

     

     dataStore.deleteByQuery(query);

     log.info("身份证号码从 " + startKey + " 到 " + endKey + " 的人员信息被删除");

   }

   privatevoid printResult(Result<String, Person> result) throws IOException, Exception {      

     while(result.next()){

     String resultKey =result.getKey();

     Person resultPerson =result.get();

       

     System.out.println(resultKey + ":");

     printPerson(resultPerson);

     }

     

     System.out.println("人数:" + result.getOffset());

   }

   privatevoid printPerson(Personperson) {

     if(person== null){

        System.out.println("没有结果");

     } else{

       System.out.println(person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());

     }

   }

   privatevoid close() throws IOException, Exception{

     if(dataStore != null)

        dataStore.close();

   } 

   privatestaticfinal String USAGE = "PersonManager -parse<input_person_file>\n" +

                                        "          -get <idcard>\n" +

                                        "          -query <idcard>\n" +

                                        "          -query <startIdcard> <endIdcard>\n" +

                                       "          -delete <idcard>\n" +

                                       "          -deleteByQuery <startIdcard> <endIdcard>\n";

   

   publicstaticvoid main(String[] args) throws Exception {

     if(args.length < 2) {

        System.err.println(USAGE);

        System.exit(1);

     }

     

     PersonManager manager = new PersonManager();

     

     if("-parse".equals(args[0])){

        manager.parse(args[1]);

     } elseif("-get".equals(args[0])){

        manager.get(args[1]);

     } elseif("-query".equals(args[0])){

        if(args.length == 2)

          manager.query(args[1]);

        else

          manager.query(args[1], args[2]);

     } elseif("-delete".equals(args[0])){

        manager.delete(args[1]);

     } elseif("-deleteByQuery".equalsIgnoreCase(args[0])){

        manager.deleteByQuery(args[1], args[2]);

     } else{

        System.err.println(USAGE);

        System.exit(1);

     }

     

     manager.close();

   }

}

vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonAnalytics.java

    输入:

package org.apdplat.demo.gora;

import java.io.IOException;

import org.apache.avro.util.Utf8;

import org.apache.gora.mapreduce.GoraMapper;

import org.apache.gora.store.DataStore;

import org.apache.gora.store.DataStoreFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apdplat.demo.gora.generated.Person;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

publicclass PersonAnalytics extends Configured implements Tool {

    privatestaticfinal Logger log= LoggerFactory

            .getLogger(PersonAnalytics.class);

    publicstaticclassPersonAnalyticsMapper extends

            GoraMapper<String,Person, Text, LongWritable> {

        private LongWritable one = new LongWritable(1L);

        @Override

        protectedvoid map(String key, Person person, Contextcontext)

                throws IOException,InterruptedException {

            Utf8 age =person.getAge();

            context.write(new Text(age.toString()), one);

        };

    }

    publicstaticclassPersonAnalyticsReducer extends

            Reducer<Text,LongWritable, Text, LongWritable> {

        @Override

        protectedvoid reduce(Text key,Iterable<LongWritable> values,

                Context context) throws IOException,InterruptedException {

            long sum = 0L;

            for (LongWritable value :values) {

                sum += value.get();

            }

            context.write(key, new LongWritable(sum));

        };

    }

    public Job createJob(DataStore<String,Person> inStore, int numReducer)

            throws IOException {

        Job job = new Job(getConf());

        job.setJobName("Person Analytics");

        log.info("Creating Hadoop Job: " +job.getJobName());

        job.setNumReduceTasks(numReducer);

        job.setJarByClass(getClass());

        GoraMapper.initMapperJob(job,inStore, Text.class,LongWritable.class,

                PersonAnalyticsMapper.class, true);

        job.setReducerClass(PersonAnalyticsReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        TextOutputFormat

                .setOutputPath(job,newPath("person-analytics-output"));

        return job;

    }

    @Override

    publicint run(String[] args) throws Exception {

        DataStore<String,Person> inStore;

        Configuration conf = new Configuration();

        if (args.length == 1) {

            String dataStoreClass =args[0];

            inStore =DataStoreFactory.getDataStore(dataStoreClass,

                    String.class, Person.class, conf);

        } else {

            inStore =DataStoreFactory.getDataStore(String.class, Person.class,

                    conf);

        }

        Job job = createJob(inStore,2);

        boolean success = job.waitForCompletion(true);

        inStore.close();

        log.info("PersonAnalytics completed with "

                + (success ? "success": "failure"));

        return success ? 0 : 1;

    }

    publicstaticvoidmain(String[] args) throws Exception {

        int ret = ToolRunner.run(new PersonAnalytics(),args);

        System.exit(ret);

    }

}

9、准备数据

        vi gora-demo/src/main/resources/persons.txt

    输入:

      533001198510125839 杨尚川 25

      533001198510125840 杨尚华 22

      533001198510125841 刘德华 55

      533001198510125842 刘亦菲 25

      533001198510125843 蔡卓妍 25

      533001198510125844 林志玲 22

               533001198510125845 李连杰 55

10、在Linux命令行使用maven2编译运行项目

cd gora-demo

mvn clean compile

mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonManager

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-parse src/main/resources/persons.txt"

mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonAnalytics

cat person-analytics-output/part-r-00000

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-get 533001198510125842"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125844"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125842 533001198510125845"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-delete 533001198510125840"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125841 533001198510125842"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125845 533001198510125846"

mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125838 533001198510125848"

11、在windows下使用eclipse编译运行项目

mvn clean package

rm -r target

vi .classpath

删除所有包含path="M2_REPO的行

删除<classpathentry kind="src" path="target/maven-shared-archive-resources"excluding="**/*.java"/>

通过WinSCP把gora-demo传到windows

将gora-demo导入eclipse

将lib下的所有jar加入构建路径

12、打包项目并提交Hadoop运行

cd gora-demo

mvn clean package

mkdir job

cp -r lib job/lib

cp -r target/classes/* job

hadoop fs -put persons.txt persons.txt

jar -cvf gora-demo.job *

hadoop jar gora-demo.job org.apdplat.demo.gora.PersonAnalytics 


 

APDPlat旗下十大开源项目

相关推荐