Spark编程模型(之莎士比亚文集词频统计实现)

Spark编程模型之莎士比亚文集词频统计

        

        前段时间因为学校的云计算比赛我无意间接触到了Spark云计算框架,从此对其一发不可收拾,无论从其执行效率还有他的其他方面的架构都感觉到无比强大,作为一个云计算的解决方案他有着比hadoop更好的优越性。因为Spark我又接触到编程语言届的新贵Scala面向对象的函数式编程语言,更是被它的巧妙所折服。

        那么问题来了,请问什么事Spark?       Spark是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。Spark非常小巧玲珑,由加州伯克利大学AMP实验室的Matei为主的小团队所开发。使用的语言是Scala,项目的core部分的代码只有63个Scala文件,非常短小精悍。

       那么接下来讨论Spark编程模型,Spark 应用程序有两部分组成:
         – Driver
        – Executor
首先由Driver向集群管理器(Cluster Manager)申请资源,Cluster Manager在给出资源之后,Spark 在worker节点启动Executor,然后用Driver将jar包文件传给Executor,并将任务分割成一个个Task分配给Executor运行,Executor运行结束,提交汇总结束任务运行。


Spark编程模型(之莎士比亚文集词频统计实现)
 

Spark进行编程主要是对它的RDD( Resilient Distributed Datasets,弹性分布式数据集)进行操作也就是其中Executor对RDD的操作。

    其中RDD的操作主要变现在三方面:1.由Base到RDD,也就是我们可以将HDFS中的文件或者本地的文件转换成RDD。

2.Transformation操作也就是RDD->RDD(RDD之间互相转化的过程)

3.Action操作也就是RDD->driver or Base(RDD返还给Driver或者转化成hfds(或者本地)的文件过程)

一下是我总结的一些函数


Spark编程模型(之莎士比亚文集词频统计实现)
 

因为这是我第一次写关于Spark方面的博客,那么我们就拿Spark的一个比赛中非常简单的题目作介绍。

莎士比亚文集词频统计并行化算法
 环境描述: 本题目需要运行在 Apache Spark 1.0.1 环境下,使用 Java 或者 Scala 进行编程开发。
 题目描述: 在给定的莎士比亚文集上(多个文件) ,根据规定的停词表,统计 出现频度最高的 100 个单词。
 数据集: shakespear 文集, 具体下载地址见大赛网站 http://cloud.seu.edu.cn。
 停词表: stopword.txt, 具体下载地址见大赛网站 http://cloud.seu.edu.cn。
 程序设计约束: 程序需要三个输入参数,第一个为数据集路径(即 shakespear 文件夹的路径,文件夹中的文件名为固定文件名),第二个为停词表路径,第三 个为输出文件路径。 输出文件的格式为:


Spark编程模型(之莎士比亚文集词频统计实现)
 

每个单词独立一行。

首先说明一下题目的意思,这个题目就是统计一下数据集: shakespear 文集中除去停词表中出现的单词中出现频率最高的100个。

我的过程如下:
  1.停词存储
    因为涉及的停词不是很多,但是要注意去除停词表后面的空格,把他们读入内存处理空格后形成一个Scala停词数组.
  2.flatMap()
    首先我会把每一行出现的制表符,逗号,冒号,分号等一些特殊符号替换成空格,然后按照空格将字符串进行分割.
  3.filter
    把出现在停词数组中的单词和空字符除去
  4.map
    把上面经过过滤的单词设为key,值设为1
  5.reduceByKey
    将所有相同的key进行增量累加
  6.获得前一百个key
    从上面所得的map结果集中获得key列表,然后从列表中获取前100个keyall
  7.将结果存储磁盘文件

具体的Scala代码如下:

package com.zdx.spark

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.SparkContext._
/**
 * Created by zdx on 14-11-8.
 */


  object ShaShiBiYa {
    def main(args: Array[String]) {
      if (args.length != 3) {
        System.err.println("Usage: ShaShiBiYa <file of poems> <file of stopWord> <file of output>")
        System.exit(1)
      }

      val conf = new SparkConf().setAppName("ShaShiBiYa")
      val sc = new SparkContext(conf)
      val rddpoems=sc.textFile(args(0))
      val rddstop=sc.textFile(args(1))
      val stop=rddstop.map(_.replaceAll(" +","")).collect
      val word2=rddpoems.flatMap(_.replaceAll("\\t|\\(|\\)|\\||\\.|\\,|\\:|\\[|\\]|\\?|\\--|\\;|\\!"," ").split(" +")).filter(stop.contains(_)==false).filter(_.equals("")==false)
      val result2=word2.map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
      val keyall=result2.keys
      val key100=keyall.take(100)
      val rdd8=sc.parallelize(key100)
      rdd8.saveAsTextFile(args(2))
      sc.stop()
    }


}

在具体的Scala代码中我们可以看到,代码非常的简洁,可见Scala是多么的强大,哈哈。继续学习,天天向上!!!

    

相关推荐