spark实现倒排索引

[color=green][/color]packagesparkTest.rdd;

importjava.util.ArrayList;

importjava.util.List;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.JavaPairRDD;

importorg.apache.spark.api.java.JavaRDD;

importorg.apache.spark.api.java.JavaSparkContext;

importorg.apache.spark.api.java.function.FlatMapFunction;

importorg.apache.spark.api.java.function.PairFunction;

importscala.Tuple2;

/**

*@authoryuzhijun

*

倒排索引程序

输入(/files/InvertedIndex.txt内容):

url01:key01,key02,key03

url02:key01,key04,key05,key06

url03:key01,key02,key03

url04:key01,key02,key03,key04,key05

url05:key02,key03,key04,key05

输出:

(key01,[url01,url02,url03,url04])

(key02,[url01,url03,url04,url05])

(key03,[url01,url03,url04,url05])

(key04,[url02,url04,url05])

(key05,[url02,url04,url05])

(key06,[url02])

*

*/

/**

*倒排索引InvertedIndex

*/

publicclassInvertedIndex{

publicstaticvoidmain(String[]args){

SparkConfconf=(newSparkConf()).setAppName("InvertedIndex").setMaster("local[1]");

@SuppressWarnings("resource")

JavaSparkContextsc=newJavaSparkContext(conf);//创建spark上下文

JavaRDD<String>file=sc.textFile(System.getProperty("user.dir")+"/files/InvertedIndex.txt");//加载文件

/*

*生成(url地址,关键字列表)数据格式

*/

JavaPairRDD<String,String>urlKeys=file.mapToPair(newPairFunction<String,String,String>(){

privatestaticfinallongserialVersionUID=1L;

publicTuple2<String,String>call(Stringline){

String[]arr=line.split(":");

Stringkey=arr[0];//url地址

Stringvalue=arr[1];//关键字列表

returnnewTuple2<String,String>(key,value);

}

});

/*

*将关键字作为key,URL作为value,生成(关键字,URL)数据格式

*/

JavaRDD<Tuple2<String,String>>keyUrl=urlKeys.flatMap(newFlatMapFunction<Tuple2<String,String>,Tuple2<String,String>>(){

privatestaticfinallongserialVersionUID=1L;

publicIterable<Tuple2<String,String>>call(Tuple2<String,String>t){

Stringurl=t._1;

Stringkeys=t._2;//关键字

String[]keyArr=keys.split(",");

List<Tuple2<String,String>>keyUrlList=newArrayList<Tuple2<String,String>>();

for(Stringkey:keyArr){//循环遍历每个url对应的关键字

keyUrlList.add(newTuple2<String,String>(key,url));

}

returnkeyUrlList;

}

});

/*

1将关键字作为key,url作为value

2根据key进行groupby的汇总

3根据key排序

4将结果保存到文件

*/

keyUrl.mapToPair(newPairFunction<Tuple2<String,String>,String,String>(){

privatestaticfinallongserialVersionUID=1L;

publicTuple2<String,String>call(Tuple2<String,String>t){

returnnewTuple2<String,String>(t._1,t._2);//(关键字,url)

}

}).groupByKey().sortByKey().saveAsTextFile(System.getProperty("user.dir")+"/files/SparkRddTest01/InvertedIndex");

}

}