Spark从入门到放弃系列——RDD持久化

1. 原理

Spark一个重要的功能该特性就是将RDD持久化到内存中。当对RDD进行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并在之后对RDD的反复使用中,直接使用内存中缓存的partition。这样的话,对于一个RDD反复执行的操作场景中,就只需要对RDD计算一次即可,而不需要反复计算RDD。巧妙使用RDD持久化,甚至在某种场景下,可以将spark应用程序性能提升10倍。对于迭代式算法和快速交互应用来说,RDD的持久化吃非常必要的。

要持久化一个RDD,只需要调用RDD的cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存到每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

cache()和presist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本,即调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersist()方法。

Spark自己在shuffle过程中,会进行数据的持久化,比如写在磁盘中,主要是为了在节点失败时,避免需要重新计算整个过程。

2.代码实现

2.1 Java代码实现

1. publicclassPersist{

2.

3. publicstaticvoidmain(String[]args){

4. SparkConfconf=newSparkConf()

5. .setAppName(persist)

6. .setMaster(local);

7.

8. JavaSparkContextsc=newJavaSparkContext(conf);

9.

10. JavaRDDStringlines=sc.textFile(c://xx.xx.txt).cache();

11.

12. longbeginTime=System.currentTimeMillis();

13. longcount=lines.count();

14. System.out.println(count);

15. longendTime=System.currentTimeMillis();

16. System.out.println(costTime:+(endTime-beginTime));

17.

18. sc.close();

19. }

20. }

2.2 Scala代码实现

1. publicclassPersist{

2. publicstaticvoidmain(String[]args){

3.

4. valconf=newSparkConf()

5. .setAppName(persist)

6. .setMaster(local)

7.

8. valsc=newSparkContext(conf)

9.

10. vallines=sc.textFile(c://xx.xx.txt).cache()

11.

12. longbeginTime=System.currentTimeMillis()

13. longcount=lines.count()

14. System.out.println(count)

15. longendTime=System.currentTimeMillis()

16. System.out.println(costTime:+(endTime-beginTime))

17.

18. sc.close()

19. }

20. }

相关推荐