每日一题 为了工作 2020 0426 第五十五题
// hadoop+spark二次排序代码
package com.swust.hadoop; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.*; /** * * @author 雪瞳 * @Slogan 时钟尚且前行,人怎能再此止步! * @Function 实现二次排序 * */ public class DoubleSort { public static void main(String[] args) { // if (args.length < 1){ // System.err.println("Unused file path!"); // System.exit(1); // } // String inputPath = args[0]; String inputPath = "./data/sorts.txt"; SparkConf conf = new SparkConf().setMaster("local").setAppName("double"); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.setLogLevel("Error"); JavaRDD<String> lines = jsc.textFile(inputPath,1); // x 2 9 //name time value //将输入数据转化成K-V格式(姓名,<时间,数值>) JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = lines.mapToPair(new PairFunction<String, String, Tuple2<Integer, Integer>>() { @Override public Tuple2<String, Tuple2<Integer, Integer>> call(String one) throws Exception { String[] words = one.split(" "); String rkey = words[0]; int tkey = Integer.valueOf(words[1]); int tval = Integer.valueOf(words[2]); Tuple2<Integer, Integer> rval = new Tuple2<>(tkey, tval); return new Tuple2<>(rkey, rval); } }); //对数据进行分组 JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey(); //组内数据进行排序 //为了对RDD的值进行排序,先将其复制到另一个列表内,RDD本身以及其包含的元素都具有这种不可变性 JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> sorts = groups.mapValues(new Function<Iterable<Tuple2<Integer, Integer>>, Iterable<Tuple2<Integer, Integer>>>() { @Override public Iterable<Tuple2<Integer, Integer>> call(Iterable<Tuple2<Integer, Integer>> one) throws Exception { List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>((Collection<? extends Tuple2<Integer, Integer>>) one); Collections.sort(list, new Comparator<Tuple2<Integer, Integer>>() { @Override public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { int r1 = o1._1.compareTo(o2._1); if (r1 == 0) { int r2 = o1._2.compareTo(o2._2); return r2; } return r1; } }); return list; } }); List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outputs = sorts.collect(); for (Tuple2<String, Iterable<Tuple2<Integer, Integer>>> elem : outputs){ Iterable<Tuple2<Integer, Integer>> valIterable = elem._2; System.err.println(elem._1+"\t"); for (Tuple2<Integer,Integer> tp : valIterable){ System.err.println(tp._1+"---"+tp._2); } } } }
相关推荐
hanwentan 2020-07-21
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26