大数据系列11:Gora – 大数据持久化
wget http://archive.apache.org/dist/gora/0.3/apache-gora-0.3-src.zip
unzip apache-gora-0.3-src.zip
cd apache-gora-0.3
mvn clean package
1、创建项目
mvn archetype:create -DgroupId=org.apdplat.demo.gora -DartifactId=gora-demo
2、增加依赖
vi gora-demo/pom.xml
在<dependencies>标签内增加:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.12</version>
</dependency>
<dependency>
<groupId>org.apache.gora</groupId>
<artifactId>gora-core</artifactId>
<version>0.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxrs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.gora</groupId>
<artifactId>gora-hbase</artifactId>
<version>0.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
</exclusion>
</exclusions>
</dependency>
3、数据建模
mkdir -p gora-demo/src/main/avro
vi gora-demo/src/main/avro/person.json
输入:
{
"type": "record",
"name": "Person",
"namespace":"org.apdplat.demo.gora.generated",
"fields" : [
{"name":"idcard", "type": "string"},
{"name":"name", "type": "string"},
{"name":"age", "type": "string"}
]
}
4、生成JAVA类
bin/gora goracompiler gora-demo/src/main/avro/person.json gora-demo/src/main/java/
5、模型映射
mkdir -p gora-demo/src/main/resources/
vi gora-demo/src/main/resources/gora-hbase-mapping.xml
输入:
<gora-orm>
<table name="Person">
<familyname="basic"/>
<familyname="detail"/>
</table>
<class table="Person"name="org.apdplat.demo.gora.generated.Person"keyclass="java.lang.String">
<field name="idcard"family="basic" qualifier="idcard"/>
<field name="name"family="basic" qualifier="name"/>
<field name="age"family="detail" qualifier="age"/>
</class>
</gora-orm>
6、Gora配置
vi gora-demo/src/main/resources/gora.properties
输入:
gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
gora.datastore.autocreateschema=true
7、Hbase配置
vi gora-demo/src/main/resources/hbase-site.xml
输入:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl"href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>host001</value>
</property>
</configuration>
8、编写PersonManager.java和PersonAnalytics.java
vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonManager.java
输入:
package org.apdplat.demo.gora;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import org.apache.avro.util.Utf8;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.hadoop.conf.Configuration;
import org.apdplat.demo.gora.generated.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
publicclass PersonManager {
privatestaticfinal Logger log = LoggerFactory.getLogger(PersonManager.class);
private DataStore<String, Person> dataStore;
public PersonManager() {
try{
init();
} catch(IOException ex) {
thrownew RuntimeException(ex);
}
}
privatevoid init() throws IOException {
Configuration conf = new Configuration();
dataStore= DataStoreFactory.getDataStore(String.class, Person.class, conf);
}
privatevoid parse(String input) throws IOException,ParseException, Exception {
log.info("解析文件:" + input);
BufferedReader reader = new BufferedReader(new FileReader(input));
longlineCount = 0;
try{
String line = reader.readLine();
do {
Person person = parseLine(line);
if(person != null) {
//入库
storePerson(person.getIdcard().toString(), person);
}
lineCount++;
line = reader.readLine();
} while(line != null);
} finally{
reader.close();
}
log.info("文件解析完毕. 总人数:" + lineCount);
}
private Person parseLine(String line) throws ParseException {
String[] attrs = line.split(" ");
String idcard = attrs[0];
String name = attrs[1];
String age = attrs[2];
Person person = new Person();
person.setIdcard(new Utf8(idcard));
person.setName(new Utf8(name));
person.setAge(new Utf8(age));
return person;
}
privatevoid storePerson(String key,Person person) throwsIOException, Exception {
log.info("保存人员信息: " + person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());
dataStore.put(key,person);
}
privatevoid get(String key) throws IOException, Exception{
Person person = dataStore.get(key);
printPerson(person);
}
privatevoid query(String key) throws IOException, Exception{
Query<String, Person> query = dataStore.newQuery();
query.setKey(key);
Result<String, Person> result = query.execute();
printResult(result);
}
privatevoid query(String startKey,String endKey) throwsIOException, Exception {
Query<String, Person> query = dataStore.newQuery();
query.setStartKey(startKey);
query.setEndKey(endKey);
Result<String, Person> result = query.execute();
printResult(result);
}
privatevoid delete(String key) throws Exception {
dataStore.delete(key);
dataStore.flush();
log.info("身份证号码为:" + key + " 的人员信息被删除");
}
privatevoid deleteByQuery(StringstartKey, String endKey) throws IOException, Exception {
Query<String, Person> query = dataStore.newQuery();
query.setStartKey(startKey);
query.setEndKey(endKey);
dataStore.deleteByQuery(query);
log.info("身份证号码从 " + startKey + " 到 " + endKey + " 的人员信息被删除");
}
privatevoid printResult(Result<String, Person> result) throws IOException, Exception {
while(result.next()){
String resultKey =result.getKey();
Person resultPerson =result.get();
System.out.println(resultKey + ":");
printPerson(resultPerson);
}
System.out.println("人数:" + result.getOffset());
}
privatevoid printPerson(Personperson) {
if(person== null){
System.out.println("没有结果");
} else{
System.out.println(person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());
}
}
privatevoid close() throws IOException, Exception{
if(dataStore != null)
dataStore.close();
}
privatestaticfinal String USAGE = "PersonManager -parse<input_person_file>\n" +
" -get <idcard>\n" +
" -query <idcard>\n" +
" -query <startIdcard> <endIdcard>\n" +
" -delete <idcard>\n" +
" -deleteByQuery <startIdcard> <endIdcard>\n";
publicstaticvoid main(String[] args) throws Exception {
if(args.length < 2) {
System.err.println(USAGE);
System.exit(1);
}
PersonManager manager = new PersonManager();
if("-parse".equals(args[0])){
manager.parse(args[1]);
} elseif("-get".equals(args[0])){
manager.get(args[1]);
} elseif("-query".equals(args[0])){
if(args.length == 2)
manager.query(args[1]);
else
manager.query(args[1], args[2]);
} elseif("-delete".equals(args[0])){
manager.delete(args[1]);
} elseif("-deleteByQuery".equalsIgnoreCase(args[0])){
manager.deleteByQuery(args[1], args[2]);
} else{
System.err.println(USAGE);
System.exit(1);
}
manager.close();
}
}
vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonAnalytics.java
输入:
package org.apdplat.demo.gora;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apdplat.demo.gora.generated.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
publicclass PersonAnalytics extends Configured implements Tool {
privatestaticfinal Logger log= LoggerFactory
.getLogger(PersonAnalytics.class);
publicstaticclassPersonAnalyticsMapper extends
GoraMapper<String,Person, Text, LongWritable> {
private LongWritable one = new LongWritable(1L);
@Override
protectedvoid map(String key, Person person, Contextcontext)
throws IOException,InterruptedException {
Utf8 age =person.getAge();
context.write(new Text(age.toString()), one);
};
}
publicstaticclassPersonAnalyticsReducer extends
Reducer<Text,LongWritable, Text, LongWritable> {
@Override
protectedvoid reduce(Text key,Iterable<LongWritable> values,
Context context) throws IOException,InterruptedException {
long sum = 0L;
for (LongWritable value :values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
};
}
public Job createJob(DataStore<String,Person> inStore, int numReducer)
throws IOException {
Job job = new Job(getConf());
job.setJobName("Person Analytics");
log.info("Creating Hadoop Job: " +job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());
GoraMapper.initMapperJob(job,inStore, Text.class,LongWritable.class,
PersonAnalyticsMapper.class, true);
job.setReducerClass(PersonAnalyticsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
TextOutputFormat
.setOutputPath(job,newPath("person-analytics-output"));
return job;
}
@Override
publicint run(String[] args) throws Exception {
DataStore<String,Person> inStore;
Configuration conf = new Configuration();
if (args.length == 1) {
String dataStoreClass =args[0];
inStore =DataStoreFactory.getDataStore(dataStoreClass,
String.class, Person.class, conf);
} else {
inStore =DataStoreFactory.getDataStore(String.class, Person.class,
conf);
}
Job job = createJob(inStore,2);
boolean success = job.waitForCompletion(true);
inStore.close();
log.info("PersonAnalytics completed with "
+ (success ? "success": "failure"));
return success ? 0 : 1;
}
publicstaticvoidmain(String[] args) throws Exception {
int ret = ToolRunner.run(new PersonAnalytics(),args);
System.exit(ret);
}
}
9、准备数据
vi gora-demo/src/main/resources/persons.txt
输入:
533001198510125839 杨尚川 25
533001198510125840 杨尚华 22
533001198510125841 刘德华 55
533001198510125842 刘亦菲 25
533001198510125843 蔡卓妍 25
533001198510125844 林志玲 22
533001198510125845 李连杰 55
10、在Linux命令行使用maven2编译运行项目
cd gora-demo
mvn clean compile
mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonManager
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-parse src/main/resources/persons.txt"
mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonAnalytics
cat person-analytics-output/part-r-00000
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-get 533001198510125842"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125844"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125842 533001198510125845"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-delete 533001198510125840"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125841 533001198510125842"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125845 533001198510125846"
mvn exec:java -Dexec.mainclass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125838 533001198510125848"
11、在windows下使用eclipse编译运行项目
mvn clean package
rm -r target
vi .classpath
删除所有包含path="M2_REPO的行
删除<classpathentry kind="src" path="target/maven-shared-archive-resources"excluding="**/*.java"/>
通过WinSCP把gora-demo传到windows
将gora-demo导入eclipse
将lib下的所有jar加入构建路径
12、打包项目并提交Hadoop运行
cd gora-demo
mvn clean package
mkdir job
cp -r lib job/lib
cp -r target/classes/* job
hadoop fs -put persons.txt persons.txt
jar -cvf gora-demo.job *
hadoop jar gora-demo.job org.apdplat.demo.gora.PersonAnalytics