標籤:

從MapReduce的執行來看如何優化MaxCompute(原ODPS) SQL

摘要: SQL基礎有這些操作(按照執行順序來排列): from join(left join, right join, inner join, outer join ,semi join) where group by select sum distinct count order by 如果我們能理解mapreduce是怎麼實現這些SQL中的基本操作的,那麼我們將很容易理解怎麼優化SQL寫法。

點此查看原文:click.aliyun.com/m/4138

SQL基礎有這些操作(按照執行順序來排列):

from

join(left join, right join, inner join, outer join ,semi join)

where

group by

select

sum

distinct

count

order by

如果我們能理解mapreduce是怎麼實現這些SQL中的基本操作的,那麼我們將很容易理解怎麼優化SQL寫法。接下來我們一個一個的談:

from

這個操作是在解析過程中就完成了,目的就是找出輸入的表(文件)。

join(left join, right join, inner join, outer join ,semi join)

這個操作需要在參與map和reduce整個階段。下圖給出了各個階段的數據輸入輸出變化:

假如執行這個SQL:

select student_id, student_name, course_id

from student left join student_course on student.student_id = student_course.student_id;

從上面圖可以看出當出現數據在某個(某些)key特別集中的時候,就會出現reduce的接收數據是不均勻的,導致reduce端數據傾斜。

where

這個地方如果有分區欄位的話,會直接解析階段就做裁剪。不會拖到後面的map和reduce階段。如果不是分區欄位,則只會涉及得到map階段,在這個階段直接過濾。

group by

select student_id, sum(score)

from student_course

group
by student_id

將GroupBy的欄位組合為map的輸出key值,利用MapReduce的排序,在reduce階段保存LastKey區分不同的key。MapReduce的過程如下(當然這裡只是說明Reduce端的非Hash聚合過程)

select

因為MaxComput(原ODPS)的文件存儲是列式的,所以在select在編譯解析的過程中會起到裁剪列的作用。比如一個表假如有100列,select中只出現了3列,那麼其餘的97列是沒有進行計算的。寫select盡量避免使用*,並且不需要的欄位盡量刪減掉。

sum

到這裡開始涉及到了聚合函數,聚合函數需要區分可以拆分並行和不可以拆分並行兩種。sum是典型的可拆分並行的。sum(1,2,3,1) = sum(1,2) + sum(3,1) = 7。而avg就是不可並行計算,avg(1,2,3,1) != avg(1,2) + avg(3,1) != avg(avg(1,2) + avg(3,1))。但是avg可以轉化成可並行計算,比如先sum分子,再sum分母來並行化。

如果函數可並行,那麼就可以在map階段進行提前聚合,大大減少後面的發往reduce端的網路傳遞。

distinct

如果是單distinct的話,會把distinct的列直接附在group-by欄位組後面,然後進行處理。

麻煩的是multi distinct。根據disinct的邏輯,必須保證每個分組(group-by)相同的distinct列相同的key都分在同一個reduce中,否則就沒有辦法完成去重工作。所以如果按照單distinct的邏輯,reduce端就需要針對每一個distinct欄位進行排序和去重。這樣做顯然是不高效的,因為對reduce端的計算壓力很大,而且也沒有利用到shuffle階段的排序。

第二種方法就是把distinct的欄位都拆開,形成獨立的n張表。最後再做union all的操作。過程如下:

select date, count(distinct student_id),count(distinct course), sum(score)

from student_course

group
by date

order by

在odps上和order by相似的功能在還有sort by, distribute by,cluster by。 後面的語法在普通的關係型資料庫都不存在。算是mapreduce特有的功能。這裡先解釋下每個語句的含義:

order by —— order by會對輸入做全局排序,因此只有一個Reducer(多個Reducer無法保證全局有序),然而只有一個Reducer,會導致當輸入規模較大時,消耗較長的計算時間。

sort by —— sort by不是全局排序,其在數據進入reducer前完成排序,因此,如果用sort by進行排序,並且設置mapred.reduce.tasks>1,則sort by只會保證每個reducer的輸出有序,並不保證全局有序。sort by不同於order by,它不受Hive.mapred.mode屬性的影響,sort by的數據只能保證在同一個reduce中的數據可以按指定欄位排序。使用sort by你可以指定執行的reduce個數(通過set mapred.reduce.tasks=n來指定),對輸出的數據再執行歸併排序,即可得到全部結果。

distribute by —— distribute by是控制在map端如何拆分數據給reduce端的。hive會根據distribute by後面列,對應reduce的個數進行分發,默認是採用hash演算法。sort by為每個reduce產生一個排序文件。在有些情況下,你需要控制某個特定行應該到哪個reducer,這通常是為了進行後續的聚集操作。distribute by剛好可以做這件事。因此,distribute by經常和sort by配合使用。

cluster by —— cluster by除了具有distribute by的功能外還兼具sort by的功能。但是排序只能是倒敘排序,不能指定排序規則為ASC或者DESC。

MapReduce的幾個階段

input

split

map

shuffle

reduce

output 這每個階段都會出現各種問題,我們依次從前到後來講怎麼處理各個階段出現的問題。

Input & split

根據MaxCompute的功能,input可以是本地文件,也可以是資料庫的表。可以通過InputFormat借口來定義。但是這個Format和後面的split階段息息相關。因為split只切割比block小的文件,對於小文件則不作處理。所以當存在大量的小文件(特指大小達不到block大小的文件),會生成大量的split塊,同時也會啟動大量map任務。

可能出現的問題

分區裁剪中出現問題 > 解決方法是讓odps在生成任務之前就能確定好讀區到分區的範圍

輸入存在大量小文件,導致map instance數量超標 > 解決辦法是讀取時候設定塊大小,可以使用setSplitSize來控制讀取文件總大小 > 解決方案二是提前就把這些小文件給合併了

輸入文件大小分布非常不均勻,導致split的塊大小分布不均勻,從而導致map端傾斜 > 可以使用setSplitSize來控制讀取文件總大小

輸入的文件不能被切割,導致split塊大小不均勻

暫時沒有找到解法

相比於hadoop,odps系統在小文件處理方面的功能已經比較完善,主要體現在以下兩個方面:

(1) 默認情況下,當Job完成之後,如果滿足一定的條件,系統會自動分配一個FuxiTask(調度任務)進行小文件合併,即我們經常看到的MergeTask;

map

map階段的輸入是上面Input&split階段來保障的,一個分片一個map任務。所以當分片處理的不合理,map階段就會出現問題。而map端經過shuffle和combianer(可選)後,會把數據交給reduce端。

從input&split 到map可能出現的問題

輸入存在大量小文件,導致map instance數量超標 > 同上

因為ODPS的SQL或者其他任務會解析成一個Task DAG。所以從最初輸入到最終輸出會有很多的中間計算。而這些中間計算之間也是對應著一個個的map reduce。如果當上一個map/reduce任務產生的輸入可能形成一個種長尾分布,導致下一個mapreduce輸入出現長尾。也就是map端任務傾斜。

shuffle

這個階段是mapreduce的核心,設計到sort,group和數據分發。

可能出現的問題

數據量特別大,可以使用combinar來進行mapper端的聚合。odps的參數是

reduce

知道mapreduce計算模型的人都知道,map階段輸入是非結構化的,並不需要實現規定好輸入的內容,輸出則是一塊塊分區好的pair。而到reduce則有要求,那就是同樣key的map處理的pair需要發送到同樣的reduce中。這樣就會出現某key數據量很大,某key數據量很小的時候對應的reduce處理的數據量大小也是不均勻的。一旦出現這種情,任務執行的結束時間必然會受到最長任務的拖累。,v>,v>

能產生reduce數據分布不均勻的操作,最長出現的有兩分類:

  1. join

    這裡推薦本書《mapreduce設計模式》,其中的連接模式篇章把各種join的描述。在這裡大概說下join的類型:

reduce端連接

map端連接(在odps中使用mapjoin即可),這個操作的前提是存在一個小表能放入到mapreduce中的環形內存中。而且大表必須作為「主表」(比如left join的話就必須是左表,而right join就必須是右表)。

所以到底為什麼會產生傾斜呢?map端連接肯定是不會產生數據傾斜的,那麼傾斜的必然是reduce連接。當一張表出現數據熱點。這樣就會出現熱點reduce的運行遠遠大於其它的長尾,導致數據不均衡。

大概總結下就是:

- 如果存在小表,且如果左外連接時候小表是右表(或者是右外連接,小表必須是左表),可以使用mapjoin。

- 如果都是大表且有熱點,這樣會出現傾斜,這時候需要剔除熱點數據單獨處理。

- 如果都是大表沒有熱點,這樣不會出現傾斜,這樣還需要怎麼優化?——這裡首選想辦法減小數據集合,如果不能在查看是否出現某些熱門的數據,如果有,則對數據進行分桶。

count(distinct) 對於distinct的實現,單鍵的時候會被直接附到group by的欄位後,同時作為map輸出的key值來處理。這樣轉化成了group by處理,一般是沒有問題的。但是麻煩的是多鍵值count(distinct),這個沒有辦法直接把所有的distinct的欄位附到group by後面了。因為這樣無法利用shuffle階段的排序,到了reduce階段需要做很多遍的去重操作。所有一般對於multi distinct都是採用給distinct 欄位做編號,然後複製數據。比如輸入數據是這樣:

可以看到distinct會導致數據翻倍膨脹,而這些膨脹的數據後會通過網路傳輸到reduce,必然會造成很大的浪費。所以要治理,方法一是首先把distinct轉成group by放在子查詢中,然後外層再套一層查詢進行分組count。

select user_id,count(deal_id),count(item)

from

(

select user_id,deal_id, item from deal_list group
by user_id,deal_id, item

) group
by user_id;

方法二:設置參數——odps.sql.groupby.skewindata=true 當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最後完成最終的聚合操作。

推薦閱讀:

MySQL入門學習筆記——七周數據分析師實戰作業
《深入淺出SQL》學習筆記
SQL 查詢按照家庭住址進行分組時,組內平均年齡小於50歲的組中成員的姓名和年齡?
以 MySQL 為例,如何進行 SQL 注入和防止被注入?

TAG:SQL |