hadoop實驗(MapReduce)——關於氣象數據集

基礎知識:

  • MapReduce簡介

MapReduce是一種用於數據處理的編程模型。該模型非常簡單。同一個程序Hadoop可以運行用各種語言編寫的MapReduce程序。最重要的是,MapReduce程序本質上是並行的,因此可以將大規模的數據分析交給任何一個擁有足夠多機器的運營商。MapReduce的優勢在於處理大型數據集。

  • 氣象數據集

  1. 分布在全球各地的氣象感測器每隔一小時便收集當地的氣象數據,從而積累了大量的日誌數據。它們是適合用MapReduce進行分析的最佳候選,因為它們是半結構化且面向記錄的數據。

  2. National Climatic Data Center(國家氣候數據中心,NCDC)提供的數據。數據是以面向行的ASCII格式存儲的,每一行便是一個記錄。該格式支持許多氣象元素,其中許多數據是可選的或長度可變的。為簡單起見,我們將重點討論基本元素(如氣溫),這些數據是始終都有且有固定寬度的。

  • 國家氣候數據中心數據記錄的格式

相關數據下載:

  • 實驗截圖對應數據(下載地址)

  • 部分剩餘數據(下載地址)

  • 或者使用命令下載:wget -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2010/*.gz

1、使用Unix Tools分析數據

在全球氣溫數據中每年記錄的最低、最高氣溫是多少?我們先不用Hadoop來回答這一問題,因為答案中需要提供一個性能標準(baseline)和一種檢查結果的有效工具。

1.1.關於最低溫度

1)查看數據文件

相關命令:

  1. ls

注意:數據文件按照日期和氣象站進行組織。每一年都有一個目錄,每一個目錄都包含一個打包文件,文件中的每一個氣象站都帶有當年的數據。

2)在example文件夾下編寫程序腳本

對於面向行的數據,傳統的處理工具是awk。以下是一個小的程序腳本,用於計算每年的最低氣溫。

相關命令與內容:

  1. vim MinTemperature.sh

  2. ------------------------------內 容------------------------------
  3. #!/usr/bin/env bash
  4. for year in example/*
  5. do
  6. echo -ne $(basename $year)年最高溫度:
  7. gunzip -c ${year}/* |
  8. awk {temp=substr($0,88,5)+0;
  9. q=substr($0,93,1);
  10. if(temp !=9999 && q~/[01459]/ && temp<min)min=temp}
  11. END {print min/10"°C"}
  12. done
  13. ------------------------------內 容------------------------------

3)運行測試

相關命令:

  1. source MinTemperature.sh

1.2.關於最高溫度

1)查看數據文件

相關命令:

  1. ls

2)在example文件夾下編寫程序腳本

對於面向行的數據,傳統的處理工具是awk。以下是一個小的程序腳本,用於計算每年的最高氣溫。

相關命令與內容:

  1. vim MaxTemperature.sh

  2. ------------------------------內 容------------------------------
  3. #!/usr/bin/env bash
  4. for year in example/*
  5. do
  6. echo -ne $(basename $year)年最高溫度:
  7. gunzip -c ${year}/* |
  8. awk {temp=substr($0,88,5)+0;
  9. q=substr($0,93,1);
  10. if(temp !=9999 && q~/[01459]/ && temp>max)max=temp}
  11. END {print max/10"°C"}
  12. done
  13. ------------------------------內 容------------------------------

注意:該腳本循環遍歷壓縮文件,首先顯示年份,然後使用awk處理每個文件。awk腳本從數據中提取兩個欄位:氣溫和質量代碼。氣溫值通過加上一個0變成一個整數。接下來,執行測試,從而判斷氣溫值是否有效(值9999代表在NCDC數據集缺少值),質量代碼顯示的讀數是有疑問還是根本就是錯誤的。如果讀數是正確的,那麼該值將與目前看到的最大值進行比較,如果該值比原先的最大值大,就替換掉目前的最大值。當文件中所有的行都已處理完並列印出最大值後,END塊中的代碼才會被執行。

3)運行測試

相關命令:

  1. source MaxTemperature.sh

4)檢驗測試

創建一個新路徑example_1/2008,在裡面只放置一個數據包即010330-99999-2008.gz,更改腳本變數路徑,得到結果與win下的word查詢的結果比較。

2、使用Hadoop分析數據

2.1.關於最低溫度

1)下載相關jar包

hadoop-core-1.2.1.jar(下載地址)

2)創建目錄

相關命令:

  1. mkdir min

3)在min文件夾內編寫java程序

MinTemperatureMapper.java相關內容:

  1. import java.io.IOException;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.LongWritable;

  4. import org.apache.hadoop.io.Text;

  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  7. private static final int MISSING = 9999;
  8. @Override

  9. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10. String line = value.toString();

  11. String year = line.substring(15, 19);
  12. int airTemperature;

  13. if(line.charAt(87) == +) {

  14. airTemperature = Integer.parseInt(line.substring(88, 92));

  15. } else {

  16. airTemperature = Integer.parseInt(line.substring(87, 92));

  17. }
  18. String quality = line.substring(92, 93);

  19. if(airTemperature != MISSING && quality.matches("[01459]")) {

  20. context.write(new Text(year), new IntWritable(airTemperature));

  21. }

  22. }

  23. }

MinTemperatureReducer.java相關內容:

  1. import java.io.IOException;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.Text;

  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  6. @Override

  7. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  8. int minValue = Integer.MAX_VALUE;

  9. for(IntWritable value : values) {

  10. minValue = Math.min(minValue, value.get());

  11. }

  12. context.write(key, new IntWritable(minValue));

  13. }

  14. }

MinTemperature.java相關內容:

  1. import org.apache.hadoop.fs.Path;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.Text;

  4. import org.apache.hadoop.mapreduce.Job;

  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  6. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  7. public class MinTemperature {
  8. public static void main(String[] args) throws Exception {

  9. if(args.length != 2) {

  10. System.err.println("Usage: MinTemperature<input path> <output path>");

  11. System.exit(-1);

  12. }
  13. Job job = new Job();

  14. job.setJarByClass(MinTemperature.class);

  15. job.setJobName("Min temperature");

  16. FileInputFormat.addInputPath(job, new Path(args[0]));

  17. FileOutputFormat.setOutputPath(job, new Path(args[1]));

  18. job.setMapperClass(MinTemperatureMapper.class);

  19. job.setReducerClass(MinTemperatureReducer.class);

  20. job.setOutputKeyClass(Text.class);

  21. job.setOutputValueClass(IntWritable.class);

  22. System.exit(job.waitForCompletion(true) ? 0 : 1);

  23. }

  24. }

4)編譯代碼

相關命令:

  1. javac -classpath ../hadoop-core-1.2.1.jar *.java

5)打包編譯文件

相關命令:

  1. jar cvf ./MinTemperature.jar ./*.class

  2. ls

  3. mv *.jar ..

  4. rm *.class

6)創建HDFS接收源

相關命令:

  1. hadoop fs -mkdir /input

7)解壓氣象數據並上傳到HDFS中(本次選取2010年的部分數據)

相關命令:

  1. cp 010010-99999-2010.gz ~

  2. cp 010014-99999-2010.gz ~

  3. zcat *.gz > sample.txt

  4. hadoop fs -copyFromLocal sample.txt /input

8)運行程序

相關命令:

  1. hadoop jar MinTemperature.jar MinTemperature /input/sample.txt /output

注意:/output路徑是HDFS的輸出源,不用自行創建。

9)查看結果

10)通過頁面查看結果

2.2.關於最高溫度

1)創建目錄

相關命令:

  1. mkdir max

2)在max文件夾內編寫java程序

MinTemperatureMapper.java相關內容:

  1. import java.io.IOException;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.LongWritable;

  4. import org.apache.hadoop.io.Text;

  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MaxTemperatureMapper extendsMapper<LongWritable, Text, Text, IntWritable> {

  7. private static final int MISSING = 9999;

  8. @Override

  9. public void map(LongWritable key, Text value, Context context)

  10. throws IOException, InterruptedException {

  11. String line = value.toString();

  12. String year = line.substring(15, 19);

  13. int airTemperature;

  14. if (line.charAt(87) == +) { // parseInt doesnt like leading plussigns

  15. airTemperature = Integer.parseInt(line.substring(88, 92));

  16. }else {

  17. airTemperature = Integer.parseInt(line.substring(87, 92));

  18. }

  19. String quality = line.substring(92, 93);

  20. if (airTemperature != MISSING &&quality.matches("[01459]")) {

  21. context.write(new Text(year), new IntWritable(airTemperature));

  22. }

  23. }

  24. }

MaxTemperatureReducer.java相關內容:

  1. import java.io.IOException;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.Text;

  4. import org.apache.hadoop.mapreduce.Reducer;

  5. public class MaxTemperatureReducer extendsReducer<Text, IntWritable, Text, IntWritable> {

  6. @Override

  7. public void reduce(Text key, Iterable<IntWritable> values,Contextcontext)

  8. throws IOException, InterruptedException {

  9. int maxValue = Integer.MIN_VALUE;

  10. for (IntWritable value : values) {

  11. maxValue = Math.max(maxValue, value.get());

  12. }

  13. context.write(key, new IntWritable(maxValue));

  14. }

  15. }

MaxTemperature.java相關內容:

  1. import org.apache.hadoop.fs.Path;

  2. import org.apache.hadoop.io.IntWritable;

  3. import org.apache.hadoop.io.Text;

  4. import org.apache.hadoop.mapreduce.Job;

  5. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  6. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  7. public class MaxTemperature {

  8. public static void main(String[] args) throws Exception {

  9. if (args.length != 2) {

  10. System.err.println("Usage: MaxTemperature <input path><output path>");

  11. System.exit(-1);

  12. }

  13. Job job = new Job();

  14. job.setJarByClass(MaxTemperature.class);

  15. job.setJobName("Max temperature");

  16. FileInputFormat.addInputPath(job, new Path(args[0]));

  17. FileOutputFormat.setOutputPath(job, new Path(args[1]));

  18. job.setMapperClass(MaxTemperatureMapper.class);

  19. job.setReducerClass(MaxTemperatureReducer.class);

  20. job.setOutputKeyClass(Text.class);

  21. job.setOutputValueClass(IntWritable.class);

  22. System.exit(job.waitForCompletion(true) ? 0 : 1);

  23. }

  24. }

3)編譯代碼

相關命令:

  1. javac -classpath ../hadoop-core-1.2.1.jar *.java

4)打包編譯文件

相關命令:

  1. jar cvf ./MaxTemperature.jar ./*.class

  2. ls

  3. mv *.jar ..

  4. rm *.class

5)運行程序

相關命令:

  1. hadoop jar MaxTemperature.jar MaxTemperature /input/sample.txt /output

6)查看結果

7)通過頁面查看結果


推薦閱讀:

如何連續執行兩段MapReduce?
ubuntu下安裝和配置hadoop+spark集群記錄
大數據學習筆記:Hadoop之HDFS(上)
世界沉醉在數據里

TAG:大數據 | Hadoop | MapReduce |