Spark RDD
scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:15 查看该RDD的分区数量 scala> rdd1.partitions.length res0: Int = 1 创建时指定分区数量 scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:15 查看分区数量 scala> rdd1.partitions.length res1: Int = 3
map/filter
scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.map(x => x*2).collect rdd2: Array[Int] = Array(2, 4, 200, 6, 8) 从小到大 scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,true).collect rdd3: Array[Int] = Array(2, 4, 6, 8, 200) 从大到小 scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,false).collect rdd3: Array[Int] = Array(200, 8, 6, 4, 2) 过滤出大于等于50的元素 scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.filter(_>50).collect rdd2: Array[Int] = Array(100) scala> val rdd2 = rdd1.filter(x => x>50).collect rdd2: Array[Int] = Array(100) 过滤出偶数 scala> val rdd2 = rdd1.filter(_%2==0).collect rdd2: Array[Int] = Array(2, 100, 4)
map/flatMap
scala> val rdd1 = sc.parallelize(Array("w u","j i a","d o n g")) 按空格分隔 scala> rdd1.map(_.split(" ")).collect res28: Array[Array[String]] = Array(Array(w, u), Array(j, i, a), Array(d, o, n, g)) 分隔并压平 scala> val rdd2 = rdd1.flatMap(_.split(" ")) scala> rdd2.collect res5: Array[String] = Array(w, u, j, i, a, d, o, n, g) scala> val rdd1 = sc.parallelize(List(List("w u","j i a","d o n g"),List("j i a n g","r u i"))) scala> val rdd2 = rdd1.map(_.map(_.split(" "))).collect rdd2: Array[List[Array[String]]] = Array(List(Array(w, u), Array(j, i, a), Array(d, o, n, g)), List(Array(j, i, a, n, g), Array(r, u, i))) scala> val rdd2 = rdd1.map(_.flatMap(_.split(" "))).collect rdd2: Array[List[String]] = Array(List(w, u, j, i, a, d, o, n, g), List(j, i, a, n, g, r, u, i)) scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(" "))).collect rdd2: Array[String] = Array(w, u, j, i, a, d, o, n, g, j, i, a, n, g, r, u, i)
union/intersecttion/distinct
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) scala> val rdd2 = sc.parallelize(List(5,6,4,3)) 求并集 scala> val rdd3 = rdd1.union(rdd2) rdd3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 3) 求交集 scala> val rdd4 = rdd1.intersection(rdd2).collect rdd4: Array[Int] = Array(4, 3) 去重 scala> val rdd5 = rdd3.distinct rdd5: Array[Int] = Array(1, 2, 3, 4, 5, 6)
sortBy
对rdd1里的每个元素乘以2,然后排序 scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.map(_*2).sortBy(x => x,true)//为什么sortBy里面用下划线不行? scala> rdd2.collect res21: Array[Int] = Array(2, 4, 6, 8, 200) 注意区别一下两种情况,根据=>右边的情况sort scala> rdd1.sortBy(x=>x,true).collect res22: Array[Int] = Array(1, 2, 3, 4,100) rdd1.sortBy(x=>100-x,true).collect res23: Array[Int] = Array(100, 4, 3, 2, 1)
groupByKey/groupBy/reduceByKey/sortByKey
scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:15 scala> val rdd2 = rdd1.groupByKey() rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:17 scala> val rdd2 = rdd1.groupByKey() 大专栏 Spark RDD.collect rdd2: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(50, 90)), (class2,CompactBuffer(80, 70))) scala> rdd2.foreach(score => {println(score._1);score._2.foreach(singlescore => println(singlescore))}) class1 50 90 class2 80 70 scala> val rdd5 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",50))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24 scala> val rdd2=rdd1.groupBy(x=>x._2).collect rdd2: Array[(Int, Iterable[(String, Int)])] = Array((80,CompactBuffer((class2,80))), (50,CompactBuffer((class1,50), (class1,50))), (70,CompactBuffer((class2,70)))) scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:15 scala> val rdd2 = rdd1.reduceByKey(_+_).collect rdd2: Array[(String, Int)] = Array((class1,140), (class2,150)) scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7))) scala> val rdd3 = rdd1.union(rdd2) 按key进行聚合 scala> val rdd4 = rdd3.reduceByKey(_+_) scala> rdd4.collect res23: Array[(String, Int)] = Array((tom,9), (jerry,11), (shuke,7), (kitty,3)) 按value的降序排序 scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1)) scala> rdd5.collect res24: Array[(String, Int)] = Array((jerry,11), (tom,9), (shuke,7), (kitty,3)) scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:15 scala> val rdd2 = rdd1.sortByKey().collect rdd2: Array[(String, Int)] = Array((class1,50), (class1,90), (class2,80), (class2,70)) scala> rdd2.foreach(score => println(score._1+":"+score._2)) class1:50 class1:90 class2:80 class2:70
join/leftOuterJoin/rightOuterJoin/union
顺序影响结果 scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7))) scala> val rdd3 = rdd1.join(rdd2).collect rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9))) scala> rdd2.join(rdd1).collect res5: Array[(String, (Int, Int))] = Array((tom,(8,1)), (jerry,(9,2))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2))) scala> val rdd3 = rdd1.join(rdd2).collect rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9))) scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (jerry,(2,Some(9))), (kitty,(3,None))) scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)), (shuke,(None,7))) scala> val rdd3 = rdd1.union(rdd2).collect rdd3: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2)) scala> val rdd3 = rdd1.union(rdd2) scala> val rdd4 = rdd3.groupByKey scala> rdd4.collect res11: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3) 求每个单词出现的次数 scala> val rdd5 = rdd3.groupByKey.map(x=>(x._1,x._2.sum)) scala> rdd5.collect res12: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3)) scala> rdd3.groupByKey.mapValues(_.sum).collect res14: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))
相关推荐
Johnson0 2020-07-28
Hhanwen 2020-07-05
zhixingheyitian 2020-05-29
Oeljeklaus 2020-05-29
Hhanwen 2020-05-29
zhixingheyitian 2020-05-28
粗茶淡饭 2020-05-27
Hhanwen 2020-05-04
Hhanwen 2020-05-03
Oeljeklaus 2020-04-19
Kwong 2020-02-20
yanqianglifei 2020-02-22
yanqianglifei 2020-02-20
Johnson0 2020-02-01
hell0kitty 2020-01-25