一起學Hadoop——實現MapReduce全局排序
來自專欄一起學大數據
Hadoop排序,從大的範圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然後在reduce函數中在對調回去。從小範圍來說排序又分成部分排序,全局排序,輔助排序,二次排序等。本文介紹如何在Hadoop中實現全局排序。
全局排序,就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。
對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同範圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然後由reduce1處理,5001到10000的key發動到partition2然後由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:
1、當數據量大時會出現OOM。
2、會出現數據傾斜。
Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。
TotalOrderPartitioner類提供了數據採樣器,對key值進行部分採樣,然後按照採樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。
TotalOrderPartitioner 類提供了三個採樣器,分別是:
- SplitSampler 分片採樣器,從數據分片中採樣數據,該採樣器不適合已經排好序的數據
- RandomSampler隨機採樣器,按照設置好的採樣率從一個數據集中採樣
- IntervalSampler間隔採樣機,以固定的間隔從分片中採樣數據,對於已經排好序的數據效果非常好。
三個採樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據採樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組的長度N生成N-1個分區partition數量,然後按照分割點的範圍將對應的數據發送到對應的分區中。
下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:
Map類:
public class TotalSortMap extends Mapper<Text, Text, Text, IntWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, new IntWritable(Integer.parseInt(key.toString()))); }}
Reduce類:
public class TotalSortReduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); }}
入口類:
public class TotalSort extends Configured implements Tool{ //實現一個Kye比較器,用於比較兩個key的大小,將key由字元串轉化為Integer,然後進行比較。 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(Text.class, true); } @Override public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) { int num1 = Integer.parseInt(writableComparable1.toString()); int num2 = Integer.parseInt(writableComparable2.toString()); return num1 - num2; } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.totalorderpartitioner.naturalorder", "false"); Job job = Job.getInstance(conf, "Total Sort app"); job.setJarByClass(TotalSort.class); //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來 FileInputFormat.addInputPath(job,new Path(args[0])); //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中 FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setInputFormatClass(KeyValueTextInputFormat.class); //設置比較器,用於比較數據的大小,然後按順序排序,該例子主要用於比較兩個key的大小 job.setSortComparatorClass(KeyComparator.class); job.setNumReduceTasks(3);//設置reduce數量 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); //設置保存partitions文件的路徑 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); //key值採樣,0.01是採樣率, InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100); //將採樣數據寫入到分區文件中 InputSampler.writePartitionFile(job, sampler); job.setMapperClass(TotalSortMap.class); job.setReducerClass(TotalSortReduce.class); //設置分區類。 job.setPartitionerClass(TotalOrderPartitioner.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args)throws Exception{ int exitCode = ToolRunner.run(new TotalSort(), args); System.exit(exitCode); }}
生成測試數據的代碼如下:
#!/bin/bashdofor k in $(seq 1 10000)echo $RANDOM;done
將上面代碼保存成create_data.sh,然後執行
sh create_data.sh > test_data.txt
會生成一個test_data.txt的文本文件,文本中的內容是一行一個隨機數字
將test_data.txt上傳到HDFS中:
hadoop fs -put test_data.txt /data/
將上面的實現全局排序的代碼打成一個jar包,然後通過shell文件執行。
執行MapReduce代碼的腳本如下:
/usr/local/src/hadoop-2.6.1/bin/hadoop jar TotalSort.jar hdfs://hadoop-master:8020/data/test_data1.txt hdfs://hadoop-master:8020/total_sort_output hdfs://hadoop-master:8020/total_sort_partitions
看下運行結果,我們只需要看part-r-00000的尾10行和part-r-00001的頭10行數據,只要他們收尾相接就證明是全局有序的:
下面有幾個坑要注意,大家不要踩:
1、數據的輸入類型必須使用KeyValueTextInputFormat類而不是TextInputFormat類,因為hadoop採樣器是對key值採樣,而TextInputFormat的key是位置偏移量,value存放的是每行的輸入數據,對該key採樣沒有任何意義。KeyValueTextInputFormat的key存放的是輸入數據,對key採樣才能更好的劃分分區。用法:
job.setInputFormatClass(KeyValueTextInputFormat.class);
2、使用代碼conf.set("mapreduce.totalorderpartitioner.naturalorder", "false")設置分區的排序策略,否則是每個分區內有序,而不是全局有序。
3、採樣器只能是Text,Text類型:InputSampler.Sampler<Text, Text>,否則會報Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable這個錯誤。
4、job.setMapOutputKeyClass(Text.class)和job.setMapOutputValueClass(IntWritable.class)這兩行代碼必須在InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);這行代碼之前調用,否則會報Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable錯誤。
5、調用setSortComparatorClass方法設置排序類,對key進行排序。job.setSortComparatorClass(KeyComparator.class);類似例子中的KeyComparator類。否則是按照字典序進行排序。MapReduce默認輸出的key是字元類型時,默認是按照字典序排序。
推薦閱讀:
※Hadoop + Eclipse IDE
※聊聊分散式系統的數據一致性
※Spark 讀
※Win10中搭建Hadoop環境
※搭建Spark集群詳細教程(3)