標籤:

MapReduce Lab03 筆記

昨天才知道Hadoop這門課的考試有當堂的部分, 加上今天正好把這次的lab3寫完了, 寫一點心得。 一是防止自己忘記了,二是騙一波贊(估計根本沒人看的吧!),三是感覺自己智商比較笨,網上很多講這方面的東西我不太看得懂,我也不知道有沒有人和我一樣笨,(估計也沒多少?)能幫助一個是一個!就寫了這個筆記。

這次lab的壓縮包:sites.google.com/site/r

看了網址估計要翻牆, 主要是InputFormat, counter和Reduce端的join操作。

首先是InputFormat

這次lab的主要普及方向是如何讓一個文件被完整得讀到了一個mapper里。(其實就是統計每個文件里某個特定字元串出現次數)

主要是自定義一個InputFormat 然後extendsFileInputFormat<NullWritalbe, BytesWritable> 這裡我不知道為什麼要BytesWritable 但是書上和教授(其實教授的例子全是書上的,做的ppt還特垃圾,然後講的還沒書上好)都用了這個。因為lab需要 我就改成Text 和 Intwritable了。 然後重構2個方法 一個是 isSplitable 直接return false 防止分割。 另外就是自定義一個RecordReader。 當時我還挺蒙蔽的,為什麼要這個呢? 我防止分割不就好了? 實際上是因為我們要決定什麼作為Map的key 什麼作為Map的value. 然後我們會在InputFormat里做好單詞的統計,key是文件名, value是特定單詞在這個文件里出現的次數, 下面貼代碼:

package edu.rosehulman.wuy8.CustomInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;// WholeFileRecordReaderclass WholeFileRecordReader extends RecordReader<Text, IntWritable> { private FileSplit fileSplit; private Configuration conf; private IntWritable value; private Text str = new Text(); private boolean processed = false; private Text key; private String keyword; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; key = new Text(this.fileSplit.getPath().toString()); this.conf = context.getConfiguration(); keyword = this.conf.get("CAO"); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); str.set(contents, 0, contents.length); int num=0; String[] words = str.toString().split("[^a-zA-Z0-9]+"); for(String i:words){ if (i.compareTo(keyword)== 0) num++; } value = new IntWritable(num); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public IntWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing }}

這段大部分都是和動物書差不多,主要改動在初始化和nextKeyValue()里

首先在初始化的地方,我們獲取文件完整路徑存放在key里當作map的key, 獲取關鍵字(當時弄了半天就把關鍵字獲取的key弄成了"CAO".....)

然後在nextKeyValue里就讀取整個文件並且統計多少個關鍵詞, 存在value里.

然後是Counter

這部分挺簡單的,在之前的基礎上,直接在main里添加emu就行了。

然後就是加個一,最後獲取列印, 代碼:

public class Main extends Configured implements Tool{ public static String keyword; enum WordCount{ EqualToTwo, LessThanTwo, GreaterThanTwo } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("CAO", args[2]); Job job = Job.getInstance(conf, "Custom Input Format"); job.setJarByClass(Main.class); CustomInput.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setInputFormatClass(CustomInput.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(CustomMapper.class); boolean exitcode = job.waitForCompletion(true); System.out.println(keyword); Counters counter = job.getCounters(); long LessThan = counter.findCounter(WordCount.LessThanTwo).getValue(); System.out.println("Number of Less than Two : "+LessThan); long EqualTo = counter.findCounter(WordCount.EqualToTwo).getValue(); System.out.println("Number of Equal to Two : "+EqualTo); long GreaterThan = counter.findCounter(WordCount.GreaterThanTwo).getValue(); System.out.println("Number of Greater than Two : "+GreaterThan); long output_bytes = counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue(); System.out.println("Number of bytes of uncompressed output : "+output_bytes); return exitcode ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Main(), args); System.exit(exitCode); }}

然後Mapper部分是修改counter的 很簡單,代碼如下:

protected void map(Text key, IntWritable value, Mapper<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method int num =Integer.parseInt(value.toString()); if (num > 2){ context.getCounter(WordCount.GreaterThanTwo).increment(1); }else if (num == 2){ context.getCounter(WordCount.EqualToTwo).increment(1); }else context.getCounter(WordCount.LessThanTwo).increment(1); context.write(key, value); }

因為我們在InputFormat里已經處理過關鍵詞的出現次數了,所以這裡很簡便.

最後是Join,(Secondary Sort)

這部分我弄了挺久才明白,網上的教程其實對於大部分人來說已經挺明白了,但是我蠢啊!我很奇怪,我partition分了以後,用了compareTo或者Sort排序之後,還要再Group分組然後送到不同的reducer里, 既然最後有Group分組了,為什麼我中間還要排序呢!?

後來才想到是因為順序的原因= - =

我好蠢TAT

代碼切到linux再貼好了。

有一點注意的就是,往往我們合併,是一個大表和一個小表,或者是 用一個更多信息的表,去合併一個類似於log的表, 比如我有個log表記錄了 課堂名稱 和 上的學生, 還有一個信息的表是課堂名稱和老師. 顯而易見的是, 課堂名稱和上的學生表裡,會有很多課堂名稱重複,而我們想合併,只是在第一個表的里的每行,加一個課堂名稱所對因的老師. 這個時候,我們可以在MutipleInput里先加入小表,再加入大表,使得對於每個reduce task 第一個value是小表的信息.方便合併:

Main的代碼:

public class Driver extends Configured implements Tool { public static class KeyPartitioner extends Partitioner<IntPair, Text> { @Override public int getPartition(IntPair key, Text value, int numPartitions) { return (key.getValue().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "Custom REduce Join"); job.setJarByClass(getClass()); Path sprintPath = new Path(args[0]); Path workPath = new Path(args[1]); Path outputPath = new Path(args[2]); MultipleInputs.addInputPath(job, sprintPath, TextInputFormat.class, JoinSprintMapper.class); MultipleInputs.addInputPath(job, workPath, TextInputFormat.class, JoinWorkMapper.class); FileOutputFormat.setOutputPath(job, outputPath); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(FirstComparator.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setOutputKeyClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Driver(), args); System.exit(exitCode); }}

2個mapper的代碼非常簡單,就是剔出關鍵字什麼的. 新建類IntPair也沒什麼可說的. 網上有很多類似的帖子.我那麼菜,就不多少了.

接下來就是Reducer的部分:

public class JoinReducer extends Reducer<IntPair, Text, IntWritable, Text> { @Override protected void reduce(IntPair arg0, Iterable<Text> arg1, Reducer<IntPair, Text, IntWritable, Text>.Context arg2) throws IOException, InterruptedException { Iterator<Text> iter = arg1.iterator(); String sprint = iter.next().toString(); while (iter.hasNext()){ arg2.write(arg0.getValue(), new Text(sprint+" "+iter.next())); } }}

這裡我就先把第一個信息單獨取出來, 因為那個是小表的信息.

---------------愚蠢的分割線-------------------------------------

然後這裡記一個超級煞筆的bug.. 在自定義Writable的時候 要有一個空的constructor,否則會莫名奇妙的掛掉TAT


推薦閱讀:

MapReduce初窺 · 一
分散式機器學習的故事:LDA和MapReduce
技術分享丨關於 Hadoop 的那些事兒
Hadoop的MapReduce階段為什麼要進行排序呢,這樣的排序對後續操作有什麼好處么?

TAG:MapReduce | Hadoop |