雖然spark速度很快,但是沒有好好tuning還是沒辦法發揮它應該有的速度.避免使用GroupByKey牽涉到Spark底層處理資料時的方式.原始的文章和圖片皆來自Avoid GroupByKey,只有改成自己的範例.
我們先將Spark資料夾中的README.md讀進來,並且切成tuple的形式.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val text = sc.textFile("README.md") | |
val textPairsRDD = text.flatMap(_.split(" ")).map((_,1)) | |
textPairsRDD.take(10) | |
//res2: Array[(String, Int)] = Array((#,1), (Apache,1), (Spark,1), ("",1), | |
//(Spark,1), (is,1), (a,1), (fast,1), (and,1), (general,1)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//GroupByKey | |
textPairsRDD.groupByKey().map(x => (x._1,x._2.sum)).collect() | |
INFO SparkContext: Job finished: collect at <console>:17, took 0.227842137 s | |
//ReduceByKey | |
textPairsRDD.reduceByKey(_ + _).collect() | |
SparkContext: Job finished: collect at <console>:17, took 0.107143156 s | |
Spark讀取資料時會先就資料散佈的地方生成RDD,如上半部的三個方格代表了三個RDD Partition,資料分別散在三個地方.在執行第一次的map指令後,會將字詞“原地”切成tuple.接著在GroupByKey的階段,Spark將資料重新分配,放在下半部的兩個方格RDD中,然後在分別計算每個RDD中的字詞個數.
接著來看ReduceByKey是怎麼做的:
ReduceByKey首先就在切好詞後的RDD“原地”將字詞統計了起來,接著才來到第二階段將相同的Key重整,再新的RDD中在統計一次資料.
兩個比較起來雖然ReduceByKey計算了兩次,而GroupByKey只計算一次,但是因為Spark是將資料放在記憶體中計算,所以速度在這部分不會差上太多.真正時間上的差別在於GroupByKey將資料整包的重新打散重組,而ReduceByKey事先將資料計算過一次,只將整理後的資料丟出去重組,傳輸上的資料比前者少了一半以上,這才造成主要時間上的差異.資料在網路傳輸上以及硬碟IO上都是相當花時間和成本的.Spark的RDD設計已經幫我們減少了硬碟IO的次數,選擇適合的計算方式才能進一步減少交換資料的時間.
沒有留言:
張貼留言