開始學習寫一些MR編程實例,工作中即將使用(剛剛開始,如果有錯誤和建議,歡迎指出)
現在有一個文件,裡面記錄了全校所有學生各科成績,求每個學生的平均成績,格式如下
小明 語文 92
小明 數學 88
小明 英語 90
小強 語文 76
小強 數學 66
小強 英語 80
小木 語文 60
小木 數學 65
小木 英語 61
解決思路
Map階段先將數據拆成key:姓名,value:課程_成績的格式提供給reduce,默認的partitioner會將名字相同的學生發到同一個reduce上面
這樣reduce可以根據總分/科目數計算平均成績。
邏輯比較簡單,
代碼如下:
package com.test.mr2;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/*
* 計算學生課程平均成績(某學生總分/課程數)
* 輸入格式
*
* 小明 語文 92
* 小明 數學 88
* 小明 英語 90
* 小強 語文 76
* 小強 數學 66
* 小強 英語 80
* 小木 語文 60
* 小木 數學 65
* 小木 英語 61
*
* 輸出
*
* 小明 90
* 小強 74
* 小木 62
*/
public class Average {
public static class AverMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
String name = "";
StringBuffer out = new StringBuffer(32);
while (stringTokenizer.hasMoreElements()) {
String tmp = stringTokenizer.nextToken();
StringTokenizer st = new StringTokenizer(tmp);
while (st.hasMoreElements()) {
name = st.nextToken();
out.append(st.nextToken());
out.append("_");
out.append(st.nextToken());
// 使用默認的hash partitioner將名字相同的同學發到一個reduce上
context.write(new Text(name), new Text(out.toString()));
}
}
}
}
public static class AverReducer extends
Reducer<Text, Text, Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
//計算每個key對應的記錄條數和總分數
int count = 0;
int sum = 0;
while (it.hasNext()) {
String value = it.next().toString();
String[] strs = value.split("\\_");
if (strs.length < 2) {
continue;
}
try {
sum += Integer.parseInt(strs[1]);
} catch (Exception e) {
System.err.println(e.getMessage());
}
count++;
}
FloatWritable average = new FloatWritable(sum / count);
context.write(key, average);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println("Begin.....");
Configuration conf =new Configuration();
String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
if(arguments.length<2){
System.out.println("Usage:com.test.mr2.Average in out");
System.exit(1);
}
Job job=new Job(conf,"Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverMapper.class);
job.setReducerClass(AverReducer.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(arguments[0]));
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
System.exit(job.waitForCompletion(true)?0:1);
System.out.println("End.....");
}
}