歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

在Hadoop中重寫FileInputFormat類以處理二進制格式存儲的整數

最近開始使用MapReduce,發現網上大部分例子都是對文本數據進行處理的,也就是說在讀取輸入數據時直接使用默認的TextInputFormat進行處理即可。對於文本數據處理,這個類還是能滿足一部分應用場景。但是如果要處理以二進制形式結構化記錄存儲的文件時,這些類就不再適合了。

本文以一個簡單的應用場景為例:對按照二進制格式存儲的整數做頻數統計。當然,也可以在此基礎上實現排序之類的其他應用。實現該應用的主要難點就是如何處理輸入數據。參考《權威指南·第三版》得知需要繼承FileInputFormat這個類,並實現以下三個方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
 /*
  * 查詢判斷當前文件是否可以分塊?"true"為可以分塊,"false"表示不進行分塊
  */
 protected boolean isSplitable(Configuration conf, Path path) {
 
 }
 
 /*
  * MapReduce的客戶端調用此方法得到所有的分塊,然後將分塊發送給MapReduce服務端。
  * 注意,分塊中不包含實際的信息,而只是對實際信息的分塊信息。具體的說,每個分塊中
  * 包含當前分塊對應的文件路徑,當前分塊在該文件中起始位置,當前分塊的長度以及對應的
  * 實際數據所在的機器列表。在實現這個函數時,將這些信息填上即可。
  * */
 public List<InputSplit> getSplits(Configuration conf) throws IOException {
 }

 /*
  * 類RecordReader是用來創建傳給map函數的Key-Value序列,傳給此類的參數有兩個:一個分塊(split)和作業的配置信息(context).
  * 在Mapper的run函數中可以看到MapReduce框架執行Map的邏輯:
  * public void run(Context context) throws IOException, InterruptedException {
  *   setup(context);
  *   調用RecordReader方法的nextKeyValue,生成新的鍵值對。如果當前分塊(Split)中已經處理完畢了,則nextKeyValue會返回false.退出run函數
  *  while (context.nextKeyValue()) { 
  *   map(context.getCurrentKey(), context.getCurrentValue(), context);
  *  }
  *  cleanup(context);
  * }
  **/
 public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
   throws IOException, InterruptedException {
 }
}

--------------------------------------分割線 --------------------------------------

Ubuntu 13.04上搭建Hadoop環境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu上搭建Hadoop環境(單機模式+偽分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu下Hadoop環境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

單機版搭建Hadoop環境圖文教程詳解 http://www.linuxidc.com/Linux/2012-02/53927.htm

--------------------------------------分割線 --------------------------------------

在RecordReader函數中實現以下幾個接口

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
 /*關閉文件流
  * */
 public void close() {}

 /*
  * 獲取處理進度
  **/
 public float getProgress() {}

 /*
  * 獲取當前的Key
  * */
 public LongWritable getCurrentKey() throws IOException,
 InterruptedException {}

 /* 獲取當前的Value
  * */
 public IntWritable getCurrentValue() throws IOException,InterruptedException {}

 /*
  * 進行初始化工作,打開文件流,根據分塊信息設置起始位置和長度等等
  * */
 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
   throws IOException, InterruptedException {}

 /*生成下一個鍵值對
  **/
 public boolean nextKeyValue() throws IOException, InterruptedException {
 }
}

更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-07/104417p2.htm

Copyright © Linux教程網 All Rights Reserved