每日一题 为了工作 2020 0426 第五十五题

// hadoop+spark二次排序代码

package com.swust.hadoop;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.*;

/**
 *
 * @author 雪瞳
 * @Slogan 时钟尚且前行,人怎能再此止步!
 * @Function 实现二次排序
 *
 */
public class DoubleSort {
    public static void main(String[] args) {
//        if (args.length < 1){
//            System.err.println("Unused file path!");
//            System.exit(1);
//        }
//        String inputPath = args[0];
        String inputPath = "./data/sorts.txt";
        SparkConf conf = new SparkConf().setMaster("local").setAppName("double");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        jsc.setLogLevel("Error");
        JavaRDD<String> lines = jsc.textFile(inputPath,1);
        // x 2 9
        //name time value
        //将输入数据转化成K-V格式(姓名,<时间,数值>)
        JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = lines.mapToPair(new PairFunction<String, String, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<Integer, Integer>> call(String one) throws Exception {
                String[] words = one.split(" ");
                String rkey = words[0];
                int tkey = Integer.valueOf(words[1]);
                int tval = Integer.valueOf(words[2]);
                Tuple2<Integer, Integer> rval = new Tuple2<>(tkey, tval);
                return new Tuple2<>(rkey, rval);
            }
        });
        //对数据进行分组
        JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey();
        //组内数据进行排序
        //为了对RDD的值进行排序,先将其复制到另一个列表内,RDD本身以及其包含的元素都具有这种不可变性
        JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> sorts = groups.mapValues(new Function<Iterable<Tuple2<Integer, Integer>>,
                Iterable<Tuple2<Integer, Integer>>>() {
            @Override
            public Iterable<Tuple2<Integer, Integer>> call(Iterable<Tuple2<Integer, Integer>> one) throws Exception {
                List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>((Collection<? extends Tuple2<Integer, Integer>>) one);

                Collections.sort(list, new Comparator<Tuple2<Integer, Integer>>() {
                    @Override
                    public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
                        int r1 = o1._1.compareTo(o2._1);
                        if (r1 == 0) {
                            int r2 = o1._2.compareTo(o2._2);
                            return r2;
                        }
                        return r1;
                    }
                });
                return list;
            }
        });
        List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outputs = sorts.collect();
        
        for (Tuple2<String, Iterable<Tuple2<Integer, Integer>>> elem : outputs){
            Iterable<Tuple2<Integer, Integer>> valIterable = elem._2;
            System.err.println(elem._1+"\t");
            for (Tuple2<Integer,Integer> tp : valIterable){
                System.err.println(tp._1+"---"+tp._2);
            }
        }
    }
}

  

每日一题 为了工作 2020 0426  第五十五题

 

每日一题 为了工作 2020 0426  第五十五题

相关推荐