標籤:

MaxCompute MapReduce的7個性能優化策略

1. 輸入表的列裁剪

對於列數特別多的輸入表,Map階段處理只需要其中的某幾列,可以通過在添加輸入表時明確指定輸入的列,減少輸入量;

例如只需要c1,c2倆列,可以這樣設置:

InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);

設置之後,你在map里的讀取到的Record也就只有c1,c2倆列,如果之前是使用列名獲取Record數據的,不會有影響,而用下標獲取的需要注意這個變化。

2. 減少中間環節

如果有多個MR作業,之間有關聯關係,前一個作業的輸出是後一個作業的輸入,可以考慮採用Pipeline的模式,將多個串列的MR作業合併為一個,這樣可以用更少的作業數量完成同樣的任務,一方面減少中間落表造成的的多餘磁碟IO,提升性能;另一方面減少作業數量使調度更加簡單,增強流程的可維護性。具體使用方法參見Pipeline示例。

3. 避免資源重複讀取

資源的讀取盡量放置到setup階段讀取,避免資源的多次讀取的性能損失,另外系統也有64次讀取的限制,資源的讀取參見使用資源示例。

4. 減少對象構造開銷

對於Map/Reduce階段每次都會用到的一些java對象,避免在map/reduce函數里構造,可以放到setup階段,避免多次構造產生的開銷;

{ ... Record word; Record one; public void setup(TaskContext context) throws IOException { // 創建一次就可以,避免在map中每次重複創建 word = context.createMapOutputKeyRecord(); one = context.createMapOutputValueRecord(); one.set(new Object[]{1L}); } ...}

5. 合理選擇partition column或自定義partitioner

合理選擇partition columns,可以使用JobConf#setPartitionColumns這個方法進行設置(默認是key schema定義的column),設置後數據將按照指定的列計算hash值分發到reduce中去, 避免數據傾斜導致作業長尾現象,如有必要也可以選擇自定義partitioner,自定義partitioner的使用方法如下:

import com.aliyun.odps.mapred.Partitioner;public static class MyPartitioner extends Partitioner {@Overridepublic int getPartition(Record key, Record value, int numPartitions) { // numPartitions即對應reducer的個數 // 通過該函數決定map輸出的key value去往哪個reducer String k = key.get(0).toString(); return k.length() % numPartitions;}}

在jobconf里進行設置:

jobconf.setPartitionerClass(MyPartitioner.class)

另外需要在jobconf里明確指定reducer的個數:

jobconf.setNumReduceTasks(num)

6. 合理使用combiner

如果map的輸出結果中有很多重複的key,可以合併後輸出,combine後可以減少網路帶寬傳輸和一定shuffle的開銷,如果map輸出本來就沒有多少重複的,就不要用combiner,用了反而可能會有一些額外的開銷。combiner實現的是和reducer相同的介面,例如一個WordCount程序的combiner可以定義如下:

/** * A combiner class that combines map output by sum them. */ public static class SumCombiner extends ReducerBase { private Record count; @Override public void setup(TaskContext context) throws IOException { count = context.createMapOutputValueRecord(); } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long c = 0; while (values.hasNext()) { Record val = values.next(); c += (Long) val.get(0); } count.set(0, c); context.write(key, count); } }

7. 設置合理的split size

map默認的split size是256MB,split size的大小決定了map的個數多少,如果用戶的代碼邏輯比較耗時,map需要較長時間結束,可以通過JobConf#setSplitSize方法適當調小split size的大小。然而split size也不宜設置太小,否則會佔用過多的計算資源。

本來選自阿里雲大數據產品專家「隱林」,擅長MaxCompute、機器學習、分散式、可視化、人工智慧等大數據領域。

推薦閱讀:

產品經理:一張表格理思路,行為數據打輔助
如何查詢PC28開獎網站歷史開獎數據呢?
從Google預測流感引發的大數據反思
中華財寶:珠寶行業在大數據時代該如何前行?
MaxCompoute禁止Full Scan功能開放

TAG:大數據 |