spark实现倒排索引
[color=green][/color]packagesparkTest.rdd;
importjava.util.ArrayList;
importjava.util.List;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.PairFunction;
importscala.Tuple2;
/**
*@authoryuzhijun
*
倒排索引程序
输入(/files/InvertedIndex.txt内容):
url01:key01,key02,key03
url02:key01,key04,key05,key06
url03:key01,key02,key03
url04:key01,key02,key03,key04,key05
url05:key02,key03,key04,key05
输出:
(key01,[url01,url02,url03,url04])
(key02,[url01,url03,url04,url05])
(key03,[url01,url03,url04,url05])
(key04,[url02,url04,url05])
(key05,[url02,url04,url05])
(key06,[url02])
*
*/
/**
*倒排索引InvertedIndex
*/
publicclassInvertedIndex{
publicstaticvoidmain(String[]args){
SparkConfconf=(newSparkConf()).setAppName("InvertedIndex").setMaster("local[1]");
@SuppressWarnings("resource")
JavaSparkContextsc=newJavaSparkContext(conf);//创建spark上下文
JavaRDD<String>file=sc.textFile(System.getProperty("user.dir")+"/files/InvertedIndex.txt");//加载文件
/*
*生成(url地址,关键字列表)数据格式
*/
JavaPairRDD<String,String>urlKeys=file.mapToPair(newPairFunction<String,String,String>(){
privatestaticfinallongserialVersionUID=1L;
publicTuple2<String,String>call(Stringline){
String[]arr=line.split(":");
Stringkey=arr[0];//url地址
Stringvalue=arr[1];//关键字列表
returnnewTuple2<String,String>(key,value);
}
});
/*
*将关键字作为key,URL作为value,生成(关键字,URL)数据格式
*/
JavaRDD<Tuple2<String,String>>keyUrl=urlKeys.flatMap(newFlatMapFunction<Tuple2<String,String>,Tuple2<String,String>>(){
privatestaticfinallongserialVersionUID=1L;
publicIterable<Tuple2<String,String>>call(Tuple2<String,String>t){
Stringurl=t._1;
Stringkeys=t._2;//关键字
String[]keyArr=keys.split(",");
List<Tuple2<String,String>>keyUrlList=newArrayList<Tuple2<String,String>>();
for(Stringkey:keyArr){//循环遍历每个url对应的关键字
keyUrlList.add(newTuple2<String,String>(key,url));
}
returnkeyUrlList;
}
});
/*
1将关键字作为key,url作为value
2根据key进行groupby的汇总
3根据key排序
4将结果保存到文件
*/
keyUrl.mapToPair(newPairFunction<Tuple2<String,String>,String,String>(){
privatestaticfinallongserialVersionUID=1L;
publicTuple2<String,String>call(Tuple2<String,String>t){
returnnewTuple2<String,String>(t._1,t._2);//(关键字,url)
}
}).groupByKey().sortByKey().saveAsTextFile(System.getProperty("user.dir")+"/files/SparkRddTest01/InvertedIndex");
}
}