MapReduce是Hadoop(這種大數據處理生態環境)的編程模型。既然稱為模型,則意味著它有固定的形式。
MapReduce編程模型,就是Hadoop生態環境進行數據分析處理的固定的編程形式。
這種固定的編程形式描述如下:
MapReduce任務過程被分為兩個階段:map階段和reduce階段。每個階段都以鍵/值對作為輸入和輸出,並由程序員選擇他們的類型。
也就是說,程序員只需要定義兩個函數:map函數和reduce函數就好了,其他的計算過程交給hadoop就好了。
通過以上描述,我們可以看出:
MapReduce所能處理的場景實際是非常具體的,非常有限的,只是“數據的統計分析”場景。
天氣預報官方網址:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
但是,發現這個官方網址的文件格式和《Hadoop權威指南》( http://www.linuxidc.com/Linux/2012-07/65972.htm )所用的格式不一致,不知道是時間久了,官網的格式變了,還是作者對原始格式進行過處理,亦或這個網址根本不對,所以繼而又到《Hadoop權威指南》指定的地址下載了一個,地址如下:
https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all
如果簡單測試,也可以把下面這幾行粘貼到一個文本文件也行,這就是正確的天氣文件:
0035029070999991902010113004+64333+023450FM-12+000599999V0201401N011819999999N0000001N9-01001+99999100311ADDGF104991999999999999999999MW1381
0035029070999991902010120004+64333+023450FM-12+000599999V0201401N013919999999N0000001N9-01171+99999100121ADDGF108991999999999999999999MW1381
0035029070999991902010206004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01611+99999100121ADDGF108991999999999999999999MW1381
0029029070999991902010213004+64333+023450FM-12+000599999V0200901N011819999999N0000001N9-01721+99999100121ADDGF108991999999999999999999
0029029070999991902010220004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01781+99999100421ADDGF108991999999999999999999
本文中,我們把存儲天氣格式的文本文件命名為:temperature.txt
有兩套JavaAPI,舊的是org.apache.hadoop.mapred包,MapReduce編程是使用實現接口的方式;新的是org.apache.hadoop.marreduce包,MapReduce編程是使用繼承抽象基類的方式;其實都差不多,下面都會有顯示。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.4</version>
</dependency>
也可以不用官方的,用別人修改重新編譯過的,可以直接在Eclipse裡面像運行普通Java程序一樣運行MapReduce。
編譯過的hadoop-core-1.0.4.jar,可以在本地模擬MapReduce
如果Eclipse workspace在d:,則我們可以把d:的某個目錄,比如d:\input作為輸入目錄;d:\output作為輸出目錄。
MapReduce編程模型裡面這樣寫就可以了:
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
下載地址:
免費下載地址在 http://linux.linuxidc.com/
用戶名與密碼都是www.linuxidc.com
具體下載目錄在 /2014年資料/4月/16日/MapReduce編程實戰
下載方法見 http://www.linuxidc.com/Linux/2013-07/87684.htm
----------------------------------------------------------------------------
或者:
------------------------------------------分割線------------------------------------------
FTP地址:ftp://ftp1.linuxidc.com
用戶名:ftp1.linuxidc.com
密碼:www.linuxidc.com
在 2014年LinuxIDC.com\4月\MapReduce編程實戰
下載方法見 http://www.linuxidc.com/Linux/2013-10/91140.htm
------------------------------------------分割線------------------------------------------
下載後,直接覆蓋maven資源庫位置的文件即可。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max Temperature");
// FileInputFormat.addInputPaths(conf, new Path(args[0]));
// FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/2"));
FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output"));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
class MaxTemperatureMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
class MaxTemperatureReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
抽象類方式
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewMaxTemperature {
public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("/hadooptemp/input/2"));
FileOutputFormat.setOutputPath(job, new Path("/hadooptemp/output"));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
class NewMaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
class NewMaxTemperatureReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
context.write(key, new IntWritable(maxValue));
}
}
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-04/100241p2.htm