歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

MapReduce TotalOrderPartitioner 全局排序

我們知道Mapreduce框架在feed數據給reducer之前會對map output key排序,這種排序機制保證了每一個reducer局部有序,Hadoop 默認的partitioner是HashPartitioner,它依賴於output key的hashcode,使得相同key會去相同reducer,但是不保證全局有序,如果想要獲得全局排序結果(比如獲取top N, bottom N),就需要用到TotalOrderPartitioner了,它保證了相同key去相同reducer的同時也保證了全局有序。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

/**
 * Partitioner effecting a total order by reading split points from
 * an externally generated source.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
    extends Partitioner<K,V> implements Configurable {
  // by construction, we know if our keytype
  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
  public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
  }
}

TotalOrderPartitioner依賴於一個partition file來distribute keys,partition file是一個實現計算好的sequence file,如果我們設置的reducer number是N,那麼這個文件包含(N-1)個key分割點,並且是基於key comparator排好序的。TotalOrderPartitioner會檢查每一個key屬於哪一個reducer的范圍內,然後決定分發給哪一個reducer。

InputSampler類的writePartitionFile方法會對input files取樣並創建partition file。有三種取樣方法:

1. RandomSampler  隨機取樣
2. IntervalSampler  從s個split裡面按照一定間隔取樣,通常適用於有序數據
3. SplitSampler  從s個split中選取前n條記錄取樣

paritition file可以通過TotalOrderPartitioner.setPartitionFile(conf, partitionFile)來設置,在TotalOrderPartitioner instance創建的時候會調用setConf函數,這時會讀入partition file中key值,如果key是BinaryComparable(可以認為是字符串類型)的話會構建trie,時間復雜度是O(n), n是樹的深度。如果是非BinaryComparable類型就構建BinarySearchNode,用二分查找,時間復雜度O(log(n)),n是reduce數

      boolean natOrder =
        conf.getBoolean(NATURAL_ORDER, true);
      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
            splitPoints.length, new byte[0],
            // Now that blocks of identical splitless trie nodes are
            // represented reentrantly, and we develop a leaf for any trie
            // node with only one split point, the only reason for a depth
            // limit is to refute stack overflow or bloat in the pathological
            // case where the split points are long and mostly look like bytes
            // iii...iixii...iii  .  Therefore, we make the default depth
            // limit large but not huge.
            conf.getInt(MAX_TRIE_DEPTH, 200));
      } else {
        partitions = new BinarySearchNode(splitPoints, comparator);
      }

示例程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class TotalSortMR {
 
 public static int runTotalSortJob(String[] args) throws Exception {
  Path inputPath = new Path(args[0]);
  Path outputPath = new Path(args[1]);
  Path partitionFile = new Path(args[2]);
  int reduceNumber = Integer.parseInt(args[3]);
 
  // RandomSampler第一個參數表示key會被選中的概率,第二個參數是一個選取samples數,第三個參數是最大讀取input splits數
  RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);
 
  Configuration conf = new Configuration();
  // 設置partition file全路徑到conf
  TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
 
  Job job = new Job(conf);
  job.setJobName("Total-Sort");
  job.setJarByClass(TotalSortMR.class);
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setNumReduceTasks(reduceNumber);
 
  // partitioner class設置成TotalOrderPartitioner
  job.setPartitionerClass(TotalOrderPartitioner.class);
 
  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);
  outputPath.getFileSystem(conf).delete(outputPath, true);
 
  // 寫partition file到mapreduce.totalorderpartitioner.path
  InputSampler.writePartitionFile(job, sampler);
 
  return job.waitForCompletion(true)? 0 : 1;
 
 }
 
 public static void main(String[] args) throws Exception{
  System.exit(runTotalSortJob(args));
 }
}

上面的例子是采用InputSampler來創建partition file,其實還可以使用mapreduce來創建,可以自定義一個inputformat來取樣,將output key輸出到一個reducer

ps:hive 0.12實現了parallel ORDER BY(https://issues.apache.org/jira/browse/HIVE-1402),也是基於TotalOrderPartitioner,非常靠譜的new feature啊

更多Hadoop相關信息見Hadoop 專題頁面 http://www.linuxidc.com/topicnews.aspx?tid=13

Copyright © Linux教程網 All Rights Reserved