如何連續執行兩段MapReduce?

MapReduce採用stream方式,如何使得第一次的輸出不列印到文件中直接傳遞到下一個Mapreduce程序中作為輸入


Tez


Spark和Tez都可以解決這個問題。從發展勢頭來看Spark比較主流。


用spark。

mapreduce的設計,本來就是中間結果保存到文件,這樣可以提高可靠性,減少內存佔用,但是犧牲了性能,spark數據在內存中進行交換,要快一些,但是可靠性肯定沒法和mapreduce比,蘿蔔白菜,可有所愛吧


自己把輸出路徑設為下次的輸入路徑


用java api寫job file建立streaming任務,第二個任務的input設成第一個的output。。。當然output還是會寫到file system里。。。

我也覺得這個方法很硌硬人。。。尤其是對於本來就是不想用java才用streaming的人。。。比如我。。。好吧。。。我是根本不會java。。。但是據我所知這是唯一「正確」的做法。。。以hadoop的尿性我覺得大概也確實如此。。。

如果不需要額外的control方法的話我一直懷疑用bash寫兩個hadoop jar streaming.jar blabla的script有什麼理由不行嗎?。。。也許你可以作一下。。。

不過我認為也許當前最高票才是對的。。。用spark。。。。


著作權歸作者所有。

商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。

作者:厚積_薄發

鏈接:http://blog.csdn.net/yuxin6866/article/details/55211608

來源:CSDN博客

前面的幾篇博客主要介紹了Hadoop的存儲HDFS,接下來幾篇博客主要介紹Hadoop的計算框架MapReduce。本片博客主要講解MapReduce框架的具體執行流程,以及shuffle過程,當然這方面的技術博客已經特別多而且都寫得很優秀,我寫本篇博客之前也有過相關閱讀,受益匪淺。對一些博客和資料的參考都會才博客下方參考資料中列出。

MapReduce理解

MapRedeuce,我們可以把它分開來理解:

映射(Mapping) :對集合里的每個目標應用同一個操作。即,如果你想把表單里每個單元格乘以二,那麼把這個函數單獨地應用在每個單元格上的操作就屬於mapping(這裡體現了移動計算而不是移動數據);

化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單里一列數字的和這個任務屬於reducing。

計算框架

一個簡單的MapReduce執行流程

一個簡單的MapReduce執行流程

簡單理解,MapReduce計算框架:

把需要計算的東西放入到MapReduce中進行計算,然後返回一個我們期望的結果。所以首先我們需要一個來源(需要計算的東西)即輸入(input),然後MapReduce操作這個輸入(input),通過定義好的計算模型,最後得到一個(期望的結果)輸出(output)。

計算模型

Map和Reduce

Map和Reduce

在這裡我們主要討論的是MapReduce計算模型:

在運行一個mapreduce計算任務時候,任務過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。而程序員要做的就是定義好這兩個階段的函數:map函數和reduce函數。

實例代碼

以MapReduce統計單詞次數為例(偽代碼),主要四個模塊來講解,如上圖計算框架:

Input,數據讀入

// 設置數據輸入來源

FileInputFormat.setInputPaths(job, args[0]);

FileInputFormat.setInputDirRecursive(job, true); //遞歸

job.setInputFormatClass(TextInputFormat.class); //設置輸入格式

//TextInputFormat,一種默認的文本輸入格式,Mapper一次讀取文本中的一行數據。

使用Mapper計算

//設置Job的Mapper計算類和K2、V2類型

job.setMapperClass(WordCountMapper.class); //1.設置Mapper類

job.setMapOutputKeyClass(Text.class); //設置Mapper輸出Key的類型

job.setMapOutputValueClass(LongWritable.class);//設置Mapper輸出Value的類型

//WordCountMapper類

/**

* 自定義的Map 需要繼承Mapper

* K1 : 行序號

* V1 : 行信息

* K2 : 單詞

* V2 : 次數

*/

public static class WordCountMapper extends Mapper& {

Text k2 = new Text() ;

LongWritable v2 = new LongWritable();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//1. 獲取行信息

String line = value.toString();

//2. 獲取行的所用單詞

String[] words = line.split(" ");//這裡假設一行文本單詞分隔符為" "

for (String word : words) {

k2.set(word.getBytes()) ; //設置鍵

v2.set(1); //設置值

context.write(k2,v2);

}

}

}

使用Reducer合併計算

//設置Job的Reducer計算類和K3、V3類型

job.setReducerClass(WordCountReducer.class); //自定義的Reducer類

job.setOutputKeyClass(Text.class); //輸出Key類型

job.setOutputValueClass(LongWritable.class); //輸出Value類型

//WordCountReducer 類

/**

* 自定義的Reduce 需要繼承Reducer

* K2 : 字元串

* V3 : 次數(分組)

* K3 : 字元串

* V3 : 次數(統計總的)

*/

public static class WordCountReducer extends Reducer&{

LongWritable v3 = new LongWritable() ;

int sum = 0 ;

@Override

protected void reduce(Text key, Iterable& values, Context context)

throws IOException, InterruptedException {

sum = 0 ;

for (LongWritable value : values) {

sum +=value.get() ;

}

v3.set(sum);

context.write( key , v3 );

}

}

Output,數據寫出

FileOutputFormat.setOutputPath(job, new Path(args[1]));

運行機制

下面從兩個角度來講解MapReduce的運行機制:

各個角色實體;

運行的時間先後順序。

MapReduce計算模型的運行機制

MapReduce計算模型的運行機制

各個角色實體

程序運行時過程設計到的一個角色實體

1.1. Client:編寫mapreduce程序,配置作業,提交作業的客戶端 ;

1.2. ResourceManager:集群中的資源分配管理 ;

1.3. NodeManager:啟動和監管各自節點上的計算資源 ;

1.4. ApplicationMaster:每個程序對應一個AM,負責程序的任務調度,本身也是運行在NM的Container中 ;

1.5. HDFS:分散式文件系統,保存作業的數據、配置信息等等。

客戶端提交Job

2.1. 客戶端編寫好Job後,調用Job實例的Submit()或者waitForCompletion()方法提交作業;

2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程序的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。

Job提交到ResourceManager

3.1. 將作業運行所需要的資源拷貝到HDFS中(jar包、配置文件和計算出來的輸入分片信息等);

3.2. 調用ResourceManager的submitApplication方法將作業提交到ResourceManager。

給作業分配ApplicationMaster

4.1. ResourceManager收到submitApplication方法的調用之後會命令一個NodeManager啟動一個Container ;

4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster進程。

ApplicationMaster初始化作業

5.1. ApplicationMaster對作業進行初始化操作;

5.2. ApplicationMaster從HDFS中獲得輸入分片信息(map、reduce任務數)

任務分配

6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;

6.2. map任務優先於reduce任,map數據優先考慮本地化的數據。

任務執行,在 Container 上啟動任務(通過YarnChild進程來運行),執行map/reduce任務。

時間先後順序

輸入分片(input split)

每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為128M,可以設置)為一個分片。map輸出的結果會暫且放在一個環形內存緩衝區中(默認mapreduce.task.io.sort.mb=100M),當該緩衝區快要溢出時(默認mapreduce.map.sort.spill.percent=0.8),會在本地文件系統中創建一個溢出文件,將該緩衝區中的數據寫入這個文件;

map階段:由我們自己編寫,最後調用 context.write(…);

partition分區階段

3.1. 在map中調用 context.write(k2,v2)方法輸出,該方法會立刻調用 Partitioner類對數據進行分區,一個分區對應一個 reduce task。

3.2. 默認的分區實現類是 HashPartitioner ,根據k2的哈希值 % numReduceTasks,可能出現「數據傾斜」現象。

3.3. 可以自定義 partition ,調用 job.setPartitioner(…)自己定義分區函數。

combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作

4.1. 是可選的;

4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。

shuffle階段:即Map和Reduce中間的這個過程

5.1. 首先 map 在做輸出時候會在內存里開啟一個環形內存緩衝區,專門用來做輸出,同時map還會啟動一個守護線程;

5.2. 如緩衝區的內存達到了閾值的80%,守護線程就會把內容寫到磁碟上,這個過程叫spill,另外的20%內存可以繼續寫入要寫進磁碟的數據;

5.3. 寫入磁碟和寫入內存操作是互不干擾的,如果緩存區被撐滿了,那麼map就會阻塞寫入內存的操作,讓寫入磁碟操作完成後再繼續執行寫入內存操作;

5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函數,那麼排序前還會執行combiner操作;

5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢出文件,也就是說在做map輸出有幾次spill就會產生多少個溢出文件,等map輸出全部做完後,map會合併這些輸出文件,這個過程里還會有一個Partitioner操作(如上)

5.6. 最後 reduce 就是合併map輸出文件,Partitioner會找到對應的map輸出文件,然後進行複製操作,複製操作時reduce會開啟幾個複製線程,這些線程默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和內存大小,閾值一樣可以在配置文件里配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操作和合併文件操作,這些操作完了就會進行reduce計算了。

reduce階段:由我們自己編寫,最終結果存儲在hdfs上的。


可以試試調度平台比如oozie


推薦閱讀:

MooseFS和Hadoop兩個分散式文件系統各有什麼優缺點?
既然Spark比Hadoop性能好很多,Hadoop未來發展方向是什麼?
Spark排序的原理?
HBase可以替代redis嗎?
為什麼(hadoop基準測試中)HDFS寫入速度如此之慢?

TAG:分散式計算 | Hadoop | MapReduce |