再识spark
一.示例
1.统计PV和UV
1.1统计PV
val conf = new SparkConf() conf.setMaster("local").setAppName("pvuv") val sc = new SparkContext(conf) val lineRDD = sc.textFile("./pvuv.txt")? lineRDD.map(x=>{ val sp=x.split("\\s") (sp(5),1) }).reduceByKey(_+_).foreach(println)
1.2统计UV
lineRDD.map(x=>{ val sp=x.split("\\s") (sp(5),sp(0)) }).distinct().countByKey().foreach(println)
2.二次排序
?SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");final JavaSparkContext sc = new JavaSparkContext(sparkConf);?JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");?JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {? /** * */ private static final long serialVersionUID = 1L;? @Override public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); int first = Integer.valueOf(splited[0]); int second = Integer.valueOf(splited[1]); SecondSortKey secondSortKey = new SecondSortKey(first,second); return new Tuple2<SecondSortKey, String>(secondSortKey,line); }});?pairSecondRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() { /** * */ private static final long serialVersionUID = 1L;? @Override public void call(Tuple2<SecondSortKey, String> tuple) throws Exception { System.out.println(tuple._2); }});?public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{ /** * */ private static final long serialVersionUID = 1L; private int first; private int second; public int getFirst() { return first;
相关推荐
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26
梦的天空 2020-08-25