一、概述
MapReduce框架對處理結果的輸出會根據key值進行默認的排序,這個默認排序可以滿足一部分需求,但是也是十分有限的。在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網絡上已經有很多人分享過了,但是對二次排序的實現的原理以及整個MapReduce框架的處理流程的分析還是有非常大的出入,而且部分分析是沒有經過驗證的。本文將通過一個實際的MapReduce二次排序例子,講述二次排序的實現和其MapReduce的整個處理流程,並且通過結果和map、reduce端的日志來驗證所描述的處理流程的正確性。
二、需求描述
1、輸入數據:
sort1 1
sort2 3
sort2 77
sort2 54
sort1 2
sort6 22
sort6 221
sort6 20
2、目標輸出
sort1 1,2
sort2 3,54,77
sort6 20,22,221
三、解決思路
1、首先,在思考解決問題思路時,我們先應該深刻的理解MapReduce處理數據的整個流程,這是最基礎的,不然的話是不可能找到解決問題的思路的。我描述一下MapReduce處理數據的大概簡單流程:首先,MapReduce框架通過getSplit方法實現對原始文件的切片之後,每一個切片對應著一個map task,inputSplit輸入到Map函數進行處理,中間結果經過環形緩沖區的排序,然後分區、自定義二次排序(如果有的話)和合並,再通過shuffle操作將數據傳輸到reduce task端,reduce端也存在著緩沖區,數據也會在緩沖區和磁盤中進行合並排序等操作,然後對數據按照Key值進行分組,然後沒處理完一個分組之後就會去調用一次reduce函數,最終輸出結果。大概流程我畫了一下,如下圖:
2、具體解決思路
(1)Map端處理:
根據上面的需求,我們有一個非常明確的目標就是要對第一列相同的記錄合並,並且對合並後的數字進行排序。我們都知道MapReduce框架不管是默認排序或者是自定義排序都只是對Key值進行排序,現在的情況是這些數據不是key值,怎麼辦?其實我們可以將原始數據的Key值和其對應的數據組合成一個新的Key值,然後新的Key值對應的還是之前的數字。那麼我們就可以將原始數據的map輸出變成類似下面的數據結構:
{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那麼我們只需要對[]裡面的新key值進行排序就ok了。然後我們需要自定義一個分區處理器,因為我的目標不是想將新key相同的傳到同一個reduce中,而是想將新key中的第一個字段相同的才放到同一個reduce中進行分組合並,所以我們需要根據新key值中的第一個字段來自定義一個分區處理器。通過分區操作後,得到的數據流如下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}
分區操作完成之後,我調用自己的自定義排序器對新的Key值進行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}
(2)Reduce端處理:
經過Shuffle處理之後,數據傳輸到Reducer端了。在Reducer端對按照組合鍵的第一個字段來進行分組,並且沒處理完一次分組之後就會調用一次reduce函數來對這個分組進行處理輸出。最終的各個分組的數據結構變成類似下面的數據結構:
{sort1,[1,2]}
{sort2,[3,54,77]}
{sort6,[20,22,221]}
四、具體實現
1、自定義組合鍵
package com.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定義組合鍵
* @author zenghzhaozheng
*/
public class CombinationKey implements WritableComparable<CombinationKey>{
private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class);
private Text firstKey;
private IntWritable secondKey;
public CombinationKey() {
this.firstKey = new Text();
this.secondKey = new IntWritable();
}
public Text getFirstKey() {
return this.firstKey;
}
public void setFirstKey(Text firstKey) {
this.firstKey = firstKey;
}
public IntWritable getSecondKey() {
return this.secondKey;
}
public void setSecondKey(IntWritable secondKey) {
this.secondKey = secondKey;
}
@Override
public void readFields(DataInput dateInput) throws IOException {
// TODO Auto-generated method stub
this.firstKey.readFields(dateInput);
this.secondKey.readFields(dateInput);
}
@Override
public void write(DataOutput outPut) throws IOException {
this.firstKey.write(outPut);
this.secondKey.write(outPut);
}
/**
* 自定義比較策略
* 注意:該比較策略用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段,
* 發生地點為環形緩沖區(可以通過io.sort.mb進行大小調整)
*/
@Override
public int compareTo(CombinationKey combinationKey) {
logger.info("-------CombinationKey flag-------");
return this.firstKey.compareTo(combinationKey.getFirstKey());
}
}
說明:在自定義組合鍵的時候,我們需要特別注意,一定要實現WritableComparable接口,並且實現compareTo方法的比較策略。這個用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段,發生地點為環形緩沖區(可以通過io.sort.mb進行大小調整),但是其對我們最終的二次排序結果是沒有影響的。我們二次排序的最終結果是由我們的自定義比較器決定的。
2、自定義分區器
package com.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定義分區
* @author zengzhaozheng
*/
public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{
private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class);
/**
* 數據輸入來源:map輸出
* @author zengzhaozheng
* @param key map輸出鍵值
* @param value map輸出value值
* @param numPartitions 分區總數,即reduce task個數
*/
@Override
public int getPartition(CombinationKey key, IntWritable value,int numPartitions) {
logger.info("--------enter DefinedPartition flag--------");
/**
* 注意:這裡采用默認的hash分區實現方法
* 根據組合鍵的第一個值作為分區
* 這裡需要說明一下,如果不自定義分區的話,mapreduce框架會根據默認的hash分區方法,
* 將整個組合將相等的分到一個分區中,這樣的話顯然不是我們要的效果
*/
logger.info("--------out DefinedPartition flag--------");
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}
說明:具體說明看代碼注釋。
3、自定義比較器
package com.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定義二次排序策略
* @author zengzhaoheng
*/
public class DefinedComparator extends WritableComparator {
private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class);
public DefinedComparator() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable combinationKeyOne,
WritableComparable CombinationKeyOther) {
logger.info("---------enter DefinedComparator flag---------");
CombinationKey c1 = (CombinationKey) combinationKeyOne;
CombinationKey c2 = (CombinationKey) CombinationKeyOther;
/**
* 確保進行排序的數據在同一個區內,如果不在同一個區則按照組合鍵中第一個鍵排序
* 另外,這個判斷是可以調整最終輸出的組合鍵第一個值的排序
* 下面這種比較對第一個字段的排序是升序的,如果想降序這將c1和c2倒過來(假設1)
*/
if(!c1.getFirstKey().equals(c2.getFirstKey())){
logger.info("---------out DefinedComparator flag---------");
return c1.getFirstKey().compareTo(c2.getFirstKey());
}
else{//按照組合鍵的第二個鍵的升序排序,將c1和c2倒過來則是按照數字的降序排序(假設2)
logger.info("---------out DefinedComparator flag---------");
return c1.getSecondKey().get()-c2.getSecondKey().get();//0,負數,正數
}
/**
* (1)按照上面的這種實現最終的二次排序結果為:
* sort1 1,2
* sort2 3,54,77
* sort6 20,22,221
* (2)如果實現假設1,則最終的二次排序結果為:
* sort6 20,22,221
* sort2 3,54,77
* sort1 1,2
* (3)如果實現假設2,則最終的二次排序結果為:
* sort1 2,1
* sort2 77,54,3
* sort6 221,22,20
*/
}
}
說明:自定義比較器決定了我們二次排序的結果。自定義比較器需要繼承WritableComparator類,並且重寫compare方法實現自己的比較策略。具體的排序問題請看注釋。