Spark 实战入门

使用spark分析sogou日志

下载用户查询日志的精简版,完整版http://download.labs.sogou.com/dl/q.html
数据格式说明:
访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID。实现以下功能:
1、最热门的查询词排行 top10
2、用户查询排行 top10
3、网站访问排行版(不用区分二级域名) top50

这从我们下载下来的文件存在hdfs中,关于hadoop的安装,我参考了http://blog.csdn.net/stark_summer/article/details/4242427,这篇博客。

由于下载下来的文件格式是GBK的,上传都hdfs上之前需要转码一下。

find *.txt -exec sh -c "iconv -f GB18030 -t UTF8 {} > {}.txt" \;

 然后把下载的文件上传到hdfs

hadoop fs -mkdir /data
hadoop fs -put /root/dfs/SogouQ.reduced /data/sogou

接下来,我们就可以写spark程序来实现以上的问题

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object WebLogInfo {
  def main(args: Array[String]) {
    val dataFile = "hdfs://vs1:9000/data/sogou/SogouQ.reduced"
    val resultFile = "hdfs://vs1:9000/data/result"
    val conf = new SparkConf().setAppName("WebLogInfoApp")
    val sc = new SparkContext(conf)
    val linesRdd = sc.textFile(dataFile).map(line => line.split('\t')).filter(_.length >= 5)
    val userArray = linesRdd.map(w => (w(1), 1)).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).take(10)
    val userRdd = sc.parallelize(userArray, 1).map(x => (x._2, x._1))
    val wordArray = linesRdd.map(w => (w(2), 1)).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).take(10)
    val wordRdd = sc.parallelize(wordArray, 1).map(x => (x._2, x._1))
    val urlArray = linesRdd.map(w => (w(4).split('/')(0), 1)).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).take(50)
    val urlRdd = sc.parallelize(urlArray, 1).map(x => (x._2, x._1))

    (userRdd ++ wordRdd ++ urlRdd).repartition(1).saveAsTextFile(resultFile)
    sc.stop()
  }
}

把代码打成jar上传到spark集群就可算出结果

相关推荐