如何連續執行兩段MapReduce?
01-16
MapReduce採用stream方式,如何使得第一次的輸出不列印到文件中直接傳遞到下一個Mapreduce程序中作為輸入
Tez
Spark和Tez都可以解決這個問題。從發展勢頭來看Spark比較主流。
用spark。mapreduce的設計,本來就是中間結果保存到文件,這樣可以提高可靠性,減少內存佔用,但是犧牲了性能,spark數據在內存中進行交換,要快一些,但是可靠性肯定沒法和mapreduce比,蘿蔔白菜,可有所愛吧
自己把輸出路徑設為下次的輸入路徑
用java api寫job file建立streaming任務,第二個任務的input設成第一個的output。。。當然output還是會寫到file system里。。。我也覺得這個方法很硌硬人。。。尤其是對於本來就是不想用java才用streaming的人。。。比如我。。。好吧。。。我是根本不會java。。。但是據我所知這是唯一「正確」的做法。。。以hadoop的尿性我覺得大概也確實如此。。。如果不需要額外的control方法的話我一直懷疑用bash寫兩個hadoop jar streaming.jar blabla的script有什麼理由不行嗎?。。。也許你可以作一下。。。不過我認為也許當前最高票才是對的。。。用spark。。。。
著作權歸作者所有。
商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。
作者:厚積_薄發
鏈接:http://blog.csdn.net/yuxin6866/article/details/55211608
來源:CSDN博客
前面的幾篇博客主要介紹了Hadoop的存儲HDFS,接下來幾篇博客主要介紹Hadoop的計算框架MapReduce。本片博客主要講解MapReduce框架的具體執行流程,以及shuffle過程,當然這方面的技術博客已經特別多而且都寫得很優秀,我寫本篇博客之前也有過相關閱讀,受益匪淺。對一些博客和資料的參考都會才博客下方參考資料中列出。
MapReduce理解MapRedeuce,我們可以把它分開來理解:映射(Mapping) :對集合里的每個目標應用同一個操作。即,如果你想把表單里每個單元格乘以二,那麼把這個函數單獨地應用在每個單元格上的操作就屬於mapping(這裡體現了移動計算而不是移動數據);化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單里一列數字的和這個任務屬於reducing。計算框架一個簡單的MapReduce執行流程一個簡單的MapReduce執行流程簡單理解,MapReduce計算框架:把需要計算的東西放入到MapReduce中進行計算,然後返回一個我們期望的結果。所以首先我們需要一個來源(需要計算的東西)即輸入(input),然後MapReduce操作這個輸入(input),通過定義好的計算模型,最後得到一個(期望的結果)輸出(output)。計算模型
Map和ReduceMap和Reduce在這裡我們主要討論的是MapReduce計算模型:在運行一個mapreduce計算任務時候,任務過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。而程序員要做的就是定義好這兩個階段的函數:map函數和reduce函數。實例代碼以MapReduce統計單詞次數為例(偽代碼),主要四個模塊來講解,如上圖計算框架:Input,數據讀入// 設置數據輸入來源FileInputFormat.setInputPaths(job, args[0]);FileInputFormat.setInputDirRecursive(job, true); //遞歸
job.setInputFormatClass(TextInputFormat.class); //設置輸入格式//TextInputFormat,一種默認的文本輸入格式,Mapper一次讀取文本中的一行數據。使用Mapper計算//設置Job的Mapper計算類和K2、V2類型job.setMapperClass(WordCountMapper.class); //1.設置Mapper類job.setMapOutputKeyClass(Text.class); //設置Mapper輸出Key的類型job.setMapOutputValueClass(LongWritable.class);//設置Mapper輸出Value的類型//WordCountMapper類
/*** 自定義的Map 需要繼承Mapper* K1 : 行序號* V1 : 行信息* K2 : 單詞* V2 : 次數*/public static class WordCountMapper extends Mapper&{ Text k2 = new Text() ;
LongWritable v2 = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. 獲取行信息 String line = value.toString();//2. 獲取行的所用單詞
String[] words = line.split(" ");//這裡假設一行文本單詞分隔符為" " for (String word : words) { k2.set(word.getBytes()) ; //設置鍵 v2.set(1); //設置值 context.write(k2,v2); } }}使用Reducer合併計算
//設置Job的Reducer計算類和K3、V3類型job.setReducerClass(WordCountReducer.class); //自定義的Reducer類job.setOutputKeyClass(Text.class); //輸出Key類型job.setOutputValueClass(LongWritable.class); //輸出Value類型//WordCountReducer 類/*** 自定義的Reduce 需要繼承Reducer* K2 : 字元串* V3 : 次數(分組)* K3 : 字元串
* V3 : 次數(統計總的)*/public static class WordCountReducer extends Reducer&{ LongWritable v3 = new LongWritable() ; int sum = 0 ; @Override protected void reduce(Text key, Iterable& values, Context context) throws IOException, InterruptedException { sum = 0 ; for (LongWritable value : values) { sum +=value.get() ; } v3.set(sum); context.write( key , v3 ); }}Output,數據寫出FileOutputFormat.setOutputPath(job, new Path(args[1]));運行機制下面從兩個角度來講解MapReduce的運行機制:各個角色實體;運行的時間先後順序。MapReduce計算模型的運行機制MapReduce計算模型的運行機制各個角色實體程序運行時過程設計到的一個角色實體1.1. Client:編寫mapreduce程序,配置作業,提交作業的客戶端 ;1.2. ResourceManager:集群中的資源分配管理 ;1.3. NodeManager:啟動和監管各自節點上的計算資源 ;1.4. ApplicationMaster:每個程序對應一個AM,負責程序的任務調度,本身也是運行在NM的Container中 ;1.5. HDFS:分散式文件系統,保存作業的數據、配置信息等等。客戶端提交Job2.1. 客戶端編寫好Job後,調用Job實例的Submit()或者waitForCompletion()方法提交作業;2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程序的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。Job提交到ResourceManager3.1. 將作業運行所需要的資源拷貝到HDFS中(jar包、配置文件和計算出來的輸入分片信息等);3.2. 調用ResourceManager的submitApplication方法將作業提交到ResourceManager。給作業分配ApplicationMaster4.1. ResourceManager收到submitApplication方法的調用之後會命令一個NodeManager啟動一個Container ;4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster進程。ApplicationMaster初始化作業5.1. ApplicationMaster對作業進行初始化操作;5.2. ApplicationMaster從HDFS中獲得輸入分片信息(map、reduce任務數)任務分配6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;6.2. map任務優先於reduce任,map數據優先考慮本地化的數據。任務執行,在 Container 上啟動任務(通過YarnChild進程來運行),執行map/reduce任務。時間先後順序輸入分片(input split)每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為128M,可以設置)為一個分片。map輸出的結果會暫且放在一個環形內存緩衝區中(默認mapreduce.task.io.sort.mb=100M),當該緩衝區快要溢出時(默認mapreduce.map.sort.spill.percent=0.8),會在本地文件系統中創建一個溢出文件,將該緩衝區中的數據寫入這個文件;map階段:由我們自己編寫,最後調用 context.write(…);partition分區階段3.1. 在map中調用 context.write(k2,v2)方法輸出,該方法會立刻調用 Partitioner類對數據進行分區,一個分區對應一個 reduce task。3.2. 默認的分區實現類是 HashPartitioner ,根據k2的哈希值 % numReduceTasks,可能出現「數據傾斜」現象。3.3. 可以自定義 partition ,調用 job.setPartitioner(…)自己定義分區函數。combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作4.1. 是可選的;4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。shuffle階段:即Map和Reduce中間的這個過程5.1. 首先 map 在做輸出時候會在內存里開啟一個環形內存緩衝區,專門用來做輸出,同時map還會啟動一個守護線程;5.2. 如緩衝區的內存達到了閾值的80%,守護線程就會把內容寫到磁碟上,這個過程叫spill,另外的20%內存可以繼續寫入要寫進磁碟的數據;5.3. 寫入磁碟和寫入內存操作是互不干擾的,如果緩存區被撐滿了,那麼map就會阻塞寫入內存的操作,讓寫入磁碟操作完成後再繼續執行寫入內存操作;5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函數,那麼排序前還會執行combiner操作;5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢出文件,也就是說在做map輸出有幾次spill就會產生多少個溢出文件,等map輸出全部做完後,map會合併這些輸出文件,這個過程里還會有一個Partitioner操作(如上)5.6. 最後 reduce 就是合併map輸出文件,Partitioner會找到對應的map輸出文件,然後進行複製操作,複製操作時reduce會開啟幾個複製線程,這些線程默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和內存大小,閾值一樣可以在配置文件里配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操作和合併文件操作,這些操作完了就會進行reduce計算了。reduce階段:由我們自己編寫,最終結果存儲在hdfs上的。
可以試試調度平台比如oozie
推薦閱讀:
※MooseFS和Hadoop兩個分散式文件系統各有什麼優缺點?
※既然Spark比Hadoop性能好很多,Hadoop未來發展方向是什麼?
※Spark排序的原理?
※HBase可以替代redis嗎?
※為什麼(hadoop基準測試中)HDFS寫入速度如此之慢?