MapReduce初窺 · 一

   繼HDFS,YARN,我硬闖大數據之路,終於到了MapReduce。

  我本來應該更早一些寫這篇文章,但一些事情拖累了我,而且我發現了一個叫Hadoop中文網的高質量文檔網站,因此我也改變了這個系列的成文思路,將儘可能地減少我的字數。


   HDFS負責海量數據存儲,而MapReduce負責海量數據計算。

   海量數據計算為什麼要叫MapReduce,為什麼不直接叫sea data compute?

   這是Google起的名字,在它04年發表的同名論文中。這是直接了當地描述計算的2個大過程,它們會在數個節點上並行運行。

  1. Map
  2. Reduce

   系統會將海量數據分割為一個個小塊分給Map任務,Map任務要輸出 k - v 形式的數據,0~n條都可以。

   之後Reduce過程其實不是必須的,如果你覺得你就Map一下就夠滿足需求,那麼就不需要Reduce,但如果你覺得不行,這樣不夠,你還需要對數據做一次聚攏,那麼你可以再寫一個Reduce任務,它會接收Map任務的輸出。各個Map任務的輸出,可能會有相同key的鍵值對,那麼它們會被聚攏、排序,會以類似 k - list of v 的形式成為Reduce任務的輸入。


那麼這個流程體驗起來是怎麼樣的?

1,上傳輸入文件到HDFS

   使用

hdfs dfs -put localFile RemoteFilePath

這類指令,將你的輸入文件放在在HDFS上。

2,寫一個MapReduce程序。

   省略

   寫完之後,將它們打包成 jar 包

3,提交job到MapReduce集群中。

在提交時,你需要指定你的輸入文件在哪兒,以及你希望結果輸出在哪個文件夾。

在你提交job之後,框架做了哪些工作呢?

  1. 檢查你的輸入輸出文件配置是否正確
  2. 它會把你的輸入,從一堆海量的數據分割為一個個InputSplit。有多少個InputSplit,就將起多少個Map任務。
  3. 如果必要的話,設置緩存
  4. 將你的 jar 包和配置文件一起copy到集群中
  5. 將你的 job 提交到 ResourceManager上(YARN的概念,來自之前的YARN Architecture)

   好像概念性的東西又寫多了,其實這些Hadoop中文網上都有,接下來不寫了,只講講最核心的東西:

Mapper:

   提供了一個叫Mapper的類,供我們extends,來編寫自己的Map任務,它的結構非常簡單,只有如下幾個方法:

   Context是框架注入的一個變數,它出現在剩下的各個方法中,有著挺深的繼承關係。但在這裡我們只是用它來收集數據,作為輸出,比如:

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value);}

   通過write方法,我們就將key,value原封不動的收集。這些被收集的數據,會作為Map任務的輸出。你可以在這裡寫業務邏輯,控制write多次、或者不write,都是ok的。

   之後我們看一下run方法:

public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); }}

  邏輯可以說非常簡單了,首先執行setup方法作為初始化方法,之後把輸入輪著餵給map方法,最後執行一下cleanup方法做一些清理工作。這3個方法內部都是完全你自己定義邏輯的,框架只是負責把數據餵給你。

   數據——map方法的輸入:key , value分別會是什麼呢?

  如果整個MapReduce任務的輸入一本詩集,我設定每一首詩起一個map任務,設定按照行來分割一首詩——那麼一個Map任務的輸入就會是一首詩,默認每次輸入一行給它的map方法。

   默認會以這行詩的hash值作為key,這行詩的文字內容作為value。那麼這首詩有幾行,這個Map任務的map方法就會被調用幾次。

   就如while循環中那樣,現在你可以看懂這個while循環了嗎?

   在Map任務中,我們一般會設定一個combiner,它會對Map任務的輸出(就是所有context.write(k, v)方法的輸入)進行一次本地的聚合。這將會減少由Mapper傳給Reducer的數據量。Mapper會一遍輸出數據一邊進行聚合。

Reducer:

   Reduce的總過程分為好幾步:

  • Shuffle:Mapper的輸出會被Comparator的類分區(partition),比如詩集里的兒童詩被分為一個partition,愛情詩被分為一個partition。一個partition對應一個reducer。shuffle這個階段,框架通過http請求,向所有的mappers獲取它的輸出信息。
  • Sort:由於不同的mappers可能會輸出相同的key,這個階段,框架將會將Reducer的輸入按Key分組。Shuffle和Sort是同時進行的(mapper也是輸出的同時,被combiber merge)
  • (Secondary Sort)在執行Reduce之前,二次排序,
  • Reduce:

看代碼吧:

  總體來說是類似的,只是reduce的輸入有所變化,以迭代器的方式來傳入list of value。run方法總體來說是類似的,但是多了一個back up store的選擇(我也不知道是什麼,但我不想讓它阻礙我迅速入門):

public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); }}

   Reducer的輸出,不會再被排序了。


   基本就結束了,肯定一頭霧水,我們可以看一個例子。

WordCount

   這是官方給的一個最小示例,用來計算一堆文檔中,每個詞的出現次數,代碼如下:

   相當於一共3個類,Mapper,Reducer,配置+jar包入口類。

   它將會根據指定的輸入目錄裡面的文件數目,決定起多少個Mapper,每個文件作為一個Mapper的輸入,每個文件按行分割輸入給map()方法。

   整個業務邏輯是統計每個詞的出現次數。

   如果我們只有2個文件作為輸入,每個文件只有一行:

good good study day day chickenwinner winner chicken dinner

   因此,會有2個Mapper。

   我們來看看Mapper怎麼寫的:

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }}

   這個類,繼承了Mapper,設定InKey為Object類,InValue為Text,OutKey為Text,OutValue為InWriteable,並只@Override了map方法。

   為什麼輸入輸出的這幾個類是這樣的,我們不用管它,這是框架的一些特殊要求,其實這個map方法做的事情,就是將輸入的一行文字,按照空格分割,然後將分割得到的詞作為key, value放置1.

   那麼我們的輸入:winner winner chicken dinner,被處理完,就是如下的狀態:

<winner, 1><winner, 1><chicken, 1><dinner, 1>

   我們可以設定一個combiner,它會在本地,將輸出的結果按key聚合一次(實際上,這個combiner類在這裡跟Reducer是一個類)。經過聚合的結果如下:

<winner, 2><chicken, 1><dinner, 1>

(原輸入:winner winner chicken dinner)

   同理,另一個Mapper最後輸出的結果是:

<good, 2><study, 1><day, 2><chicken, 1>

(原輸入:good good study day day chicken)

   之後,我們再看看Reducer:

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}

   這裡它依然用了一些莫名其妙的類,沒有關係,它做的事情就是只是把相同key的value們,相加而已。

   比如2個Mapper輸出都有<chicken, 1>,假設只有一個Reducer,Shuffle與Sort階段,會把相同key的Map輸出合併,變成<key, {value1, value2,value3……}>的形式,作為一個新的鍵值對,傳入reduce方法中。比如——

<chicken, {1, 1}>

   這裡reduce方法做的事情就是遍歷這個{1, 1},將他們的值加起來,作為最後chiken這個關鍵字的輸出——

<chicken, 2>

   其他的輸入也同理。

   最後,我們來看一下配置類:

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "wordCount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);}

   指定了一大堆類而已,注意

job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);

   這2個指定了同一個類,使得combiner的邏輯跟reducer是一模一樣的。

   其實我想說,絕大多數業務邏輯都是自己指定類實現的,具體可以參考文檔。


最後的最後,你一定非常想要文檔獲取完整信息了

質量非常非常高的中文文檔,強烈推薦:

hadoop.apache.org/docs/

英文官方文檔:

hadoop.apache.org/docs/


  這篇很初級的入門指南,花了我一周的業餘時間來看文檔,跑demo,我覺得肯定有某些地方沒說清楚。

  比如partition是怎麼分的,map的輸出數據是怎麼存儲的,Shuffle和Sort的工作細節又是怎麼樣的,如何指定Second Sort。希望在下一篇文章里,我可以一一解答。

希望對你有幫助。

推薦閱讀:

分散式機器學習的故事:LDA和MapReduce
技術分享丨關於 Hadoop 的那些事兒
Hadoop的MapReduce階段為什麼要進行排序呢,這樣的排序對後續操作有什麼好處么?

TAG:大数据 | Hadoop | MapReduce |