spark学习一
spark学习一
1、参考文档
OSTC2015-张安站-Spark技术内幕
http://share.csdn.net/slides/13506
使用IDEA开发Spark应用
http://debugo.com/idea-spark/
ApacheSpark学习:利用Eclipse构建Spark集成开发环境
http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/
flume+storm+kafka
http://blog.csdn.net/zxcvg/article/details/18600335/
http://www.aboutyun.com/thread-6855-1-1.html
http://dongxicheng.org/search-engine/log-systems/
数据挖掘与机器学习
DMML
------------------------------------------------
spark编程简介学习
http://tech.uc.cn/?p=2116
一、sparkvshadoop
1、Spark的中间数据在内存迭代运算效率更高。
Hadoop中间数据在硬盘
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
2、Spark比Hadoop更通用。
Transformations
map,filter,flatMap,sample,groupByKey,reduceByKey,union,join,cogroup,mapValues,sort,partionBy
actions
Count,collect,reduce,lookup,save
各个处理节点之间的通信模型不再像Hadoop那样就是唯一的DataShuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
3、局限
不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
Spark可以直接对HDFS进行数据的读写,同样支持SparkonYARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。
二、Spark的适用场景
1、适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大
三、运行模式
本地模式
Standalone模式
Mesoes模式
yarn模式
四、生态系统
Shark(HiveonSpark):Shark基本上就是在Spark的框架基础上提供和Hive一样的HiveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现queryParsing和LogicPlangeneration,最后的PhysicalPlanexecution阶段用Spark代替HadoopMapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
Sparkstreaming:构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。SparkStreaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。
Bagel:PregelonSpark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
五、核心概念
1、RDD
ResilientDistributedDataset(RDD)弹性分布数据集
RDD是spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。
RDD表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现
RDD必须是可序列化的
RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。
2、RDD的特点
它是在集群节点上的不可变的、已分区的集合对象。
通过并行转换的方式来创建如(map,filter,join,etc)。
失败自动重建。
可以控制存储级别(内存、磁盘等)来进行重用。
必须是可序列化的。
是静态类型的。
3、RDD的好处
RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
RDD的不变性,可以实现类HadoopMapReduce的推测式执行。
RDD的数据分区特性,可以通过数据的本地性来提高性能,这与HadoopMapReduce是一样的。
RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
4、RDD的存储与分区
用户可以选择不同的存储级别存储RDD以便重用。
当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash分区),以此保证两个数据集在Join时能高效。
5、RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
分区列表(数据块列表)
计算每个分片的函数(根据父RDD计算出此RDD)
对父RDD的依赖列表
对key-valueRDD的Partitioner【可选】
每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
6、RDD的存储级别
RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别
7、RDD的生成
RDD有两种创建方式:
1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。
2、从父RDD转换得到新RDD。
valfile=spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
// SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等创建HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
对RDD进行计算时,RDD从HDFS读取数据时与HadoopMapReduce几乎一样的:
// 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val key: K = reader.createKey() val value: V = reader.createValue() //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。 override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) }
RDD的转换与操作
对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
转换(Transformations)(如:map,filter,groupBy,join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
操作(Actions)(如:count,collect,save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
下面使用一个例子来示例说明Transformations与Actions在Spark的使用。
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
Lineage(血统)
Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。
RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。
RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter,map,joinetc.)行为。
当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。
RDD在Lineage依赖方面分为两种NarrowDependencies与WideDependencies用来解决数据容错的高效性。
NarrowDependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区.
WideDependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。
NarrowDependencies对于数据的重算开销要远小于WideDependencies的数据重算开销。
容错
在RDD计算,通过checkpint进行容错
一个是checkpointdata,
一个是loggingtheupdates(默认)
通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。
资源管理与作业调度
Spark对于资源管理与作业调度
Standalone(独立模式)
ApacheMesos
HadoopYARN
编程接口
Spark通过与编程语言集成的方式暴露RDD的操作,对数据集的操作就表示成对RDD对象的操作。
Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能(JVM上的静态强类型语言)。
用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。
Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的数据分片、返回结果或把RDD写入存储系统。
写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:
val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....
Java
Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:
JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction<String, String>() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );
使用示例
编写Driver程序
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount <master>") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }