一起學Hadoop——實現MapReduce全局排序

一起學Hadoop——實現MapReduce全局排序

來自專欄一起學大數據

Hadoop排序,從大的範圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然後在reduce函數中在對調回去。從小範圍來說排序又分成部分排序,全局排序,輔助排序,二次排序等。本文介紹如何在Hadoop中實現全局排序。

全局排序,就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。

對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同範圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然後由reduce1處理,5001到10000的key發動到partition2然後由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:

1、當數據量大時會出現OOM。

2、會出現數據傾斜。

Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。

TotalOrderPartitioner類提供了數據採樣器,對key值進行部分採樣,然後按照採樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。

TotalOrderPartitioner 類提供了三個採樣器,分別是:

  • SplitSampler 分片採樣器,從數據分片中採樣數據,該採樣器不適合已經排好序的數據
  • RandomSampler隨機採樣器,按照設置好的採樣率從一個數據集中採樣
  • IntervalSampler間隔採樣機,以固定的間隔從分片中採樣數據,對於已經排好序的數據效果非常好。

三個採樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據採樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組的長度N生成N-1個分區partition數量,然後按照分割點的範圍將對應的數據發送到對應的分區中。

下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:

Map類:

public class TotalSortMap extends Mapper<Text, Text, Text, IntWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, new IntWritable(Integer.parseInt(key.toString()))); }}

Reduce類:

public class TotalSortReduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); }}

入口類:

public class TotalSort extends Configured implements Tool{ //實現一個Kye比較器,用於比較兩個key的大小,將key由字元串轉化為Integer,然後進行比較。 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(Text.class, true); } @Override public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) { int num1 = Integer.parseInt(writableComparable1.toString()); int num2 = Integer.parseInt(writableComparable2.toString()); return num1 - num2; } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.totalorderpartitioner.naturalorder", "false"); Job job = Job.getInstance(conf, "Total Sort app"); job.setJarByClass(TotalSort.class); //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來 FileInputFormat.addInputPath(job,new Path(args[0])); //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中 FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setInputFormatClass(KeyValueTextInputFormat.class); //設置比較器,用於比較數據的大小,然後按順序排序,該例子主要用於比較兩個key的大小 job.setSortComparatorClass(KeyComparator.class); job.setNumReduceTasks(3);//設置reduce數量 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); //設置保存partitions文件的路徑 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); //key值採樣,0.01是採樣率, InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100); //將採樣數據寫入到分區文件中 InputSampler.writePartitionFile(job, sampler); job.setMapperClass(TotalSortMap.class); job.setReducerClass(TotalSortReduce.class); //設置分區類。 job.setPartitionerClass(TotalOrderPartitioner.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args)throws Exception{ int exitCode = ToolRunner.run(new TotalSort(), args); System.exit(exitCode); }}

生成測試數據的代碼如下:

#!/bin/bashdofor k in $(seq 1 10000)echo $RANDOM;done

將上面代碼保存成create_data.sh,然後執行

sh create_data.sh > test_data.txt

會生成一個test_data.txt的文本文件,文本中的內容是一行一個隨機數字

將test_data.txt上傳到HDFS中:

hadoop fs -put test_data.txt /data/

將上面的實現全局排序的代碼打成一個jar包,然後通過shell文件執行。

執行MapReduce代碼的腳本如下:

/usr/local/src/hadoop-2.6.1/bin/hadoop jar TotalSort.jar hdfs://hadoop-master:8020/data/test_data1.txt hdfs://hadoop-master:8020/total_sort_output hdfs://hadoop-master:8020/total_sort_partitions

看下運行結果,我們只需要看part-r-00000的尾10行和part-r-00001的頭10行數據,只要他們收尾相接就證明是全局有序的:

下面有幾個坑要注意,大家不要踩:

1、數據的輸入類型必須使用KeyValueTextInputFormat類而不是TextInputFormat類,因為hadoop採樣器是對key值採樣,而TextInputFormat的key是位置偏移量,value存放的是每行的輸入數據,對該key採樣沒有任何意義。KeyValueTextInputFormat的key存放的是輸入數據,對key採樣才能更好的劃分分區。用法:

job.setInputFormatClass(KeyValueTextInputFormat.class);

2、使用代碼conf.set("mapreduce.totalorderpartitioner.naturalorder", "false")設置分區的排序策略,否則是每個分區內有序,而不是全局有序。

3、採樣器只能是Text,Text類型:InputSampler.Sampler<Text, Text>,否則會報Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable這個錯誤。

4、job.setMapOutputKeyClass(Text.class)和job.setMapOutputValueClass(IntWritable.class)這兩行代碼必須在InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);這行代碼之前調用,否則會報Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable錯誤。

5、調用setSortComparatorClass方法設置排序類,對key進行排序。job.setSortComparatorClass(KeyComparator.class);類似例子中的KeyComparator類。否則是按照字典序進行排序。MapReduce默認輸出的key是字元類型時,默認是按照字典序排序。


推薦閱讀:

Hadoop + Eclipse IDE
聊聊分散式系統的數據一致性
Spark 讀
Win10中搭建Hadoop環境
搭建Spark集群詳細教程(3)

TAG:Hadoop | MapReduce | 分散式計算 |