2014年12月2日 星期二

[Apache Spark][基本架構] RDD特性(二)- 轉換(Transformations)和行動(Actions)


        其實Spark架構不大,但是真的一環扣著一環,拆開來講會有很多問題,但是當每個環節都搞懂後就覺得一切設計的又是非常合理(非常繞口的一段話).這幾篇基本架構的文章都是根據Spark的原始論文拆出來講的,試圖將Spark的基本精神跟各位分享.

        Spark主要是透過RDD來處理資料,對於RDD的操作大致可以分為兩類:轉換(Transformations)以及行動(Actions).轉換指的是不同RDD之間的變化,例如map(對於每個RDD內地元素執行相同的指令),filter(篩選RDD內的元素),特色是輸入和輸出的物件都是RDD(可參考[Spark][基本架構] 關於RDD中的不可變性(immutable)).另外一種操作則是“提取”RDD內的元素出來,像是count(計算RDD內的元素個數),reduce(計算key-value),特色是輸入的物件是RDD,但是輸出的結果可能是list,int,數值,或是存成HDFS Files.(完整轉換和行動如下表):
資料來源:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica. NSDI 2012. April 2012.

        轉換和行動除了輸入和輸出上的差別之外,也區分了Spark在運作時的排程.如果透過REPL介面來執行Spark(也就是輸入spark-shell後進入的介面),會明顯發現,RDD的運作並非是即時生效的--當我們的指令是轉換時,Spark只是記錄每個轉換前後RDD的依賴關係(dependency),一定要等等到行動發生,Spark才會將從行動開始往前推的所有步驟(包括從HDFS讀入檔案)一起compile,一起執行,例如下面範例:
scala> val data = sc.parallelize(1 to 100)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val data_map = data.map(_ + 1)
data_map: org.apache.spark.rdd.RDD[Int] = MappedRDD[1] at map at <console>:14

scala> val data_filter = data_map.filter(_ < 50)
data_filter: org.apache.spark.rdd.RDD[Int] = FilteredRDD[2] at filter at <console>:16

我們這邊一開始創建了一個RDD,裡面放了1到100,Spark時候只告訴我們創建了一個ParallelCollectionRDD(Spark基本上會幫我們判斷要建立哪種RDD).接著透過map建立了一個MappedRDD,最後是一個FilterRDD.這三個RDD之間互有依賴關係,但是都還沒有實際建立.直到我們下了reduce指令:
scala> val data_result = data_filter.reduce(_ + _)
14/12/02 00:23:03 INFO SparkContext: Starting job: reduce at <console>:18
14/12/02 00:23:03 INFO DAGScheduler: Got job 0 (reduce at <console>:18) with 4 output partitions (allowLocal=false)
14/12/02 00:23:03 INFO DAGScheduler: Final stage: Stage 0(reduce at <console>:18)
14/12/02 00:23:03 INFO DAGScheduler: Parents of final stage: List()
14/12/02 00:23:03 INFO DAGScheduler: Missing parents: List()
14/12/02 00:23:03 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at <console>:16), which has no missing parents
14/12/02 00:23:03 INFO MemoryStore: ensureFreeSpace(1936) called with curMem=0, maxMem=278019440
14/12/02 00:23:03 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1936.0 B, free 265.1 MB)
14/12/02 00:23:03 INFO DAGScheduler: Submitting 4 missing tasks from Stage 0 (FilteredRDD[2] at filter at <console>:16)
14/12/02 00:23:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
14/12/02 00:23:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
14/12/02 00:23:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/02 00:23:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/12/02 00:23:03 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
14/12/02 00:23:03 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 600 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 701 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 600 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 701 bytes result sent to driver
14/12/02 00:23:03 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 49 ms on localhost (1/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 55 ms on localhost (2/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 57 ms on localhost (3/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 69 ms on localhost (4/4)
14/12/02 00:23:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/12/02 00:23:04 INFO DAGScheduler: Stage 0 (reduce at <console>:18) finished in 0.083 s
14/12/02 00:23:04 INFO SparkContext: Job finished: reduce at <console>:18, took 0.373505684 s
data_result: Int = 1224

上圖Highlight紅色的地方是reduce的指令和結果,中間是Spark運作的過程,這篇文章先不細節進去談中間到底怎樣運作.重點在於,前面的RDD之間的轉換Spark都是採取懶惰模式(lazy operation),一直要到Action才會開始執行整個程式.所以一支Spark從讀入檔案到產出結果,可以有好幾個檔案來源生成不同RDD,這些RDD可以互相Join,Map,或Filter但是只會有一個Action,Spark一碰到Action的指令就會將前後相關的RDD轉換一併做處理.

        Spark採取懶惰模式的理由在於為了能更妥善分配系統的資源,將可以原地處理的資料盡量原地處理([Spark][基本架構] RDD特性(一)),減少檔案搬運的時間.所以才會等所有RDD流程(Transformations部分)都結束後(亦即進入Action的程序),才開始執行.