最近開始使用MapReduce,發現網上大部分例子都是對文本數據進行處理的,也就是說在讀取輸入數據時直接使用默認的TextInputFormat進行處理即可。對於文本數據處理,這個類還是能滿足一部分應用場景。但是如果要處理以二進制形式結構化記錄存儲的文件時,這些類就不再適合了。
本文以一個簡單的應用場景為例:對按照二進制格式存儲的整數做頻數統計。當然,也可以在此基礎上實現排序之類的其他應用。實現該應用的主要難點就是如何處理輸入數據。參考《權威指南·第三版》得知需要繼承FileInputFormat這個類,並實現以下三個方法:
class MyInputFormat extends FileInputFormat<Type1, Type2> {
/*
* 查詢判斷當前文件是否可以分塊?"true"為可以分塊,"false"表示不進行分塊
*/
protected boolean isSplitable(Configuration conf, Path path) {
}
/*
* MapReduce的客戶端調用此方法得到所有的分塊,然後將分塊發送給MapReduce服務端。
* 注意,分塊中不包含實際的信息,而只是對實際信息的分塊信息。具體的說,每個分塊中
* 包含當前分塊對應的文件路徑,當前分塊在該文件中起始位置,當前分塊的長度以及對應的
* 實際數據所在的機器列表。在實現這個函數時,將這些信息填上即可。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
}
/*
* 類RecordReader是用來創建傳給map函數的Key-Value序列,傳給此類的參數有兩個:一個分塊(split)和作業的配置信息(context).
* 在Mapper的run函數中可以看到MapReduce框架執行Map的邏輯:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 調用RecordReader方法的nextKeyValue,生成新的鍵值對。如果當前分塊(Split)中已經處理完畢了,則nextKeyValue會返回false.退出run函數
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
}
--------------------------------------分割線 --------------------------------------
Ubuntu 13.04上搭建Hadoop環境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu上搭建Hadoop環境(單機模式+偽分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu下Hadoop環境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
單機版搭建Hadoop環境圖文教程詳解 http://www.linuxidc.com/Linux/2012-02/53927.htm
--------------------------------------分割線 --------------------------------------
在RecordReader函數中實現以下幾個接口:
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
/*關閉文件流
* */
public void close() {}
/*
* 獲取處理進度
**/
public float getProgress() {}
/*
* 獲取當前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {}
/* 獲取當前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {}
/*
* 進行初始化工作,打開文件流,根據分塊信息設置起始位置和長度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {}
/*生成下一個鍵值對
**/
public boolean nextKeyValue() throws IOException, InterruptedException {
}
}
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-07/104417p2.htm