編程學習,最好的方法還是自己動手,所以這裡簡單介紹在Hadoop上編寫調試一個MapReduce程序。
先說一下我的開發環境,我的操作系統是CentOS 6.0,Hadoop版本是0.20.2,開發環境是eclipse。在Hadoop的0.20.0版本以後,都包含一個新的Java MapReduce API,這個API有時也稱為上下文對象,新的API在類型上不兼容以前的API。關於新舊API的區別,這裡先不做介紹了,只是在編程的時候一定要注意下,網上好多MapReduce的程序是基於舊的API編寫的,如果自己安裝的是新版的Hadoop,就會在調試時出現錯誤,初學者不明白這一點或許會走彎路。
1問題描述:查找最高氣溫。就是從氣候日志數據中查找當年的最高氣溫,假設每一條記錄的格式如下:“China2013inbeijing023isok0",其中2013是年份,023是溫度記錄,ok表示數據是完好的(為了簡單易懂,省略其他的信息),我們的任務是從大量的數據記錄中找出北京2013年的最高氣溫。這樣的數據很適合用MapReduce來處理。
2問題分析,這個問題很簡單,這裡用這麼簡單的數據只是為了說明在Hadoop上編寫調試MapReduce程序的過程。對於一條數據這需要提取出來年份和溫度,然後找出最大溫度就行了。這了類似於分治法,每一個Map過程就是分得過程,Reduce就是合的過程。
3 編碼:
3.1Map函數:
//載入一些必要的包
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
//新版APIMap過程需要繼承org.apache.hadoop.mapreduce包Mapper<SPAN ></SPAN>類,並重寫其map方法
public class MaxtemMapper extends
Mapper<LongWritable,Text,Text,IntWritable>{
private static final int MISSING=9999;
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException{
String line=value.toString();//把Text類的對象轉化為String類來處理
String year=line.substring(5,9);//該String的第5-9就是年份
int airtemperature;
//有的數據中溫度前面帶”+“,需要處理下,帶”+“的話第19到21為溫度,不帶的的話第18到21
//為溫度
if(line.charAt(18)=='+'){
airtemperature=Integer.parseInt(line.substring(19, 21));
}
else {
airtemperature=Integer.parseInt(line.substring(18,21));
}
System.out.println("year:"+year+"tem:"+airtemperature);
判斷數據是否是正確的
String quality=line.substring(23, 25);
if(airtemperature!=MISSING && quality.matches("ok")){
context.write(new Text(year), new IntWritable(airtemperature));
}
}
}
3.2Reduce函數:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxtemReducer extends
Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterator<IntWritable> values,
Context context)
throws IOException,InterruptedException{
int maxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}
context.write( key, new IntWritable(maxValue));
}
}
3.3執行MapReduce過程
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature{
//the main function
public static void main(String [] args)throws Exception{
Configuration conf=new Configuration();
if(args.length!=2){
System.out.println("Usage: Maxtemperature
<input path> <output path>");
System.exit(-1);
}
Job job=new Job(conf,"MaxTemperature");
job.setJarByClass(MaxTemperature.class);
FileInputFormat.addInputPath(job, new
Path(args[0]));
FileOutputFormat.setOutputPath(job, new
Path(args[1]));
job.setMapperClass(MaxtemMapper.class);
job.setReducerClass(MaxtemReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
//output the details of the work
System.out.println("name:"+job.getJobName());
System.out.println("isSuccessful?:"+
(job.isSuccessful()?"yes":"no"));
}
}