拖稿了一下,繼續來介紹SPARK的平行運算能力.前一篇[Spark] 建立第一個RDD物件,體驗平行運算的威力(一)我們透過python在SPARK上建立了第一個RDD物件,接著我們將開始對這個物件做一些操作.
上一篇最後留了一個小懸念,spark在建立RDD物件的時候spark還不會將資料載入(所謂"lazy evaluation"的機制),要等到我們開始操作後才會將資料存到記憶體中.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In [3]: | |
raw_ratings = sc.textFile('/Users/bryanyang/Documents/Data/Movie Rating/ratings.dat') | |
raw_ratings.setName("raw ratings") | |
raw_ratings.cache() | |
Out[3]: | |
raw ratings MappedRDD[3] at textFile at NativeMethodAccessorImpl.java:-2 | |
In [4]: | |
entries = raw_ratings.count() | |
print "%s entries in ratings" %entries | |
out[4] | |
100000 entries in ratings |
透過上述程序,我們計算了RDD物件中的數量.這時候來到STORGE的界面,就可以看到RDD物件出現在我們的記憶體中.
RDD將整個資料拉到了記憶體裡面,點一下藍色的RDD NAME,可以看到更詳細的結構:

這個RDD資料我們在載入的時候沒有特別要切成幾個part,所以只有一個partition.回到stage的畫面,可以檢視剛剛我們執行count的執行狀況.
在task的地方顯示spark將這次的動作切成幾份,預設是依照檔案大小來分派(預設32MB的資料切成一塊).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In [5]: | |
raw_ratings = sc.textFile('/Users/bryanyang/Documents/Data/Movie Rating/ratings.dat',10) ##分成10份 | |
raw_ratings.setName("raw ratings 10") | |
raw_ratings.cache() | |
Out[5]: | |
raw ratings 10 MappedRDD[6] at textFile at NativeMethodAccessorImpl.java:-2 | |
In [6]: | |
entries = raw_ratings.count() | |
print "%s entries in ratings" %entries | |
Out[6] | |
100000 entries in ratings |
但是也可手動輸入要切的份數,這次我們把資料在一開始載入的時候切成十份 ,看一下STORGE有什麼變化:
可以很清楚看到我們把資料切成了十份丟到記憶體裡面,兩個檔案都同樣是1544.3KB
切成十份之後每個partition都有同樣154.43的資料.
按一下description可以實際看到十個partition的運作情形:
那資料切成十份和一份對於整體的速度到底有沒有增加呢?由於這個資料集的量太小,看不太出來,接下來的例子換個比較大的資料可能比較清楚.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
raw_sub = sc.textFile("/Users/bryanyang/Documents/Data/Ads Prediction/random_submission.csv",1) | |
raw_sub.count() |
這次我用了一個151MB的檔案,一開始採用預設方式想資料放到一個partition裡面.但是打開Stage頁面,spark卻自動幫我們把檔案切成五份(因為spark預設一次處理最多32MB的資料).
點進去看每個 partition的執行狀態:
Spark預設幫我們以32MB為單位,將原始資料切成五塊,每一塊執行時間大約5~6秒.由於這次檔案比較大,從Launch Time來看可以很清楚看到Spark是依序處理這五個partition.並非平行運算.(畢竟我是在單機上測試嘛~"~)
Input欄後面的(hodoop)代表資料是從硬碟上讀取,接著我們實驗一下把資料讀入記憶體中的會不會明顯增加速度(畢竟spark主打的是in memory computing)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
raw_sub = sc.textFile("/Users/bryanyang/Documents/Data/Ads Prediction/random_submission.csv",1) | |
raw_sub.cache() ##代表要將資料load到記憶體中 | |
raw_sub.count() ##一樣要執行計算指令後才會load資料 |
執行上述指令後,到Storge頁面可以看到資料的確load到記憶體中了:
接著我們再次count一次,來看執行的狀況
第一次因為要把檔案存到記憶體中,花了比較久的時間,第二次直接從記憶體讀資料速度就快了很多.至於為什麼第二次input記憶體比較少的原因,還需要研究一下文件.
不過總之,Spark就是透過這樣的技術,將資料暫存在記憶體中,減少了未來在做ML需要反覆計算時,讀取硬碟的時間.(不過這個減少的幅度在我的測試環境比較看不出來,因為我用的是Mac Air 使用了PCIe的硬碟,讀取速度高達700MB/s 請參考:2013新款MacBook Air SSD速度爆表!!! 炫耀文無誤)
沒有留言:
張貼留言