Spark RDD计算每天各省的top3热门广告

数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割。

样本如下:

1516609143867 6 7 64 16

1516609143869 9 4 75 18

1516609143869 1 7 87 12

package Spark02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

// 需求:统计出每一个省份广告被点击次数的Top3
object Practice {
  def main(args: Array[String]): Unit = {

    // 1.初始化Spark配置信息并建立与Spark的连接
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Practice")
    val sc = new SparkContext(conf)

    // 2.读取数据生成RDD
    val line: RDD[String] = sc.textFile("G:\\input\\agent\\agent.log",1)

    // 3.按照最小粒度聚合 key=(Province,AD) value=1
    val provinceADtoOne: RDD[((String, String), Int)] = line.map(x => {
      val fields: Array[String] = x.split(" ")
      (((fields(1), fields(4)), 1))
    })

    // 4.计算每个省中每个广告被点击次数的总和
    val provinceAdToSum: RDD[((String, String), Int)] = provinceADtoOne.reduceByKey(_+_)

    // 5.将省份作为key,广告加点击次数作为value,(Province,(AD,sum))
    val provinceToAdSum: RDD[(String, (String, Int))] = provinceAdToSum.map(x=>(x._1._1,(x._1._2,x._2)))

    // 6.将同一个省份的所有广告进行聚合(Province,List((AD1,sum1),(AD2,sum2)))
    val provinceGroup: RDD[(String, Iterable[(String, Int)])] = provinceToAdSum.groupByKey()

    // 7.对同一省份的所有广告的集合进行排序并取前三条,排序规则为广告点击总次数
    val result: RDD[(String, List[(String, Int)])] = provinceGroup.mapValues(x => {
      x.toList.sortWith((x, y) => {
        x._2 > y._2
      }).take(3)
    })

    // 8.保存结果
//    result.collect.foreach(println)
      result.saveAsTextFile("output/practice")

    sc.stop()
    
  }
}

相关推荐