先講結論:如果你的RDD很大,千萬不要隨便Collect.原文出處為http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html.只說Collect會將所有元素複製到單一台機器上,但是沒說清楚為什麼會發生這種現象.
在[Spark]RDD的不可變性中,有提到RDD在不同的階段會產生不同的RDD物件,直到我們做了reduce後資料才從RDD中計算結果,吐回一個Int給我們.雖然我們說Spark是分散式計算,但是”分散式“這件事其實是RDD物件本身的特性,當我們把資料透過reduce變成Int後,那個Int物件其實是沒有分散的(Int物件是Scala下面的特性).所以要查清楚為什麼Collect後資料會吐回一台機器上,就要來看Collect後是什麼東西:
先延續之前的例子:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val num = 1 to 100 | |
//num: scala.collection.immutable.Range.Inclusive = Range(1,2,3,...,100) | |
val numRDD = sc.parallelize(num) | |
//numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:14 | |
val numFileter = numRDD.filter(_ < 10) | |
//numFileter: org.apache.spark.rdd.RDD[Int] = FilteredRDD[12] at filter at <console>:16 | |
val numMap = numFileter.map(_ + 10) | |
//numMap: org.apache.spark.rdd.RDD[Int] = MappedRDD[13] at map at <console>:18 | |
val numSum = numMap.reduce(_ + _) | |
//numSum: Int = 135 |
scala> val numColl = numMap.collect()
numColl: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19)
很清楚看到Collect這個動作執行後,資料就從RDD中拆開來變成了一個Int Array,Array本身也是Scala原有的物件,不支援分散式,也難怪如果RDD太大Collect後會炸機器了.
其他類似的功能還包括了
countByKey
countByValue
collectAsMap
沒有留言:
張貼留言