二次排序原理
在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。
本例子中使用的是TextInputFormat,他提供的RecordReader會將文本的字節偏移量作為key,這一行的文本作為value。
這就是自定義Map的輸入是<LongWritable, Text>的原因。然後調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。
注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。
在map階段的最後,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer。
每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次排序。
如果沒有通過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。
在第一個例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函數類。
在reduce階段,reducer接收到所有映射到這個reducer的map輸出後,也是會調用job.setSortComparatorClass設置的key比較函數類對所有數據對排序。
然後開始構造一個key對應的value迭代器。這時就要用到分組,使用job.setGroupingComparatorClass設置的分組函數類。
只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。
最後就是進入Reducer的reduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
核心總結:
1、map最後階段進行partition分區,一般使用job.setPartitionerClass設置的類,如果沒有自定義Key的hashCode()方法進行排序。
2、每個分區內部調用job.setSortComparatorClass設置的key的比較函數類進行排序,如果沒有則使用Key的實現的compareTo方法。
3、當reduce接收到所有map傳輸過來的數據之後,調用job.setSortComparatorClass設置的key比較函數類對所有數據對排序,如果沒有則使用Key的實現的compareTo方法。
4、緊接著使用job.setGroupingComparatorClass設置的分組函數類,進行分組,同一個Key的value放在一個迭代器裡面。
2、如何自定義Key
所有自定義的key應該實現接口WritableComparable,因為是可序列的並且可比較的。並重載方法
//反序列化,從流中的二進制轉換成IntPair
public void readFields(DataInput in) throws IOException
//序列化,將IntPair轉化成使用流傳送的二進制
public void write(DataOutput out)
//key的比較
public int compareTo(IntPair o)
另外新定義的類應該重寫的兩個方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
public boolean equals(Object right)
3、如何自定義分區函數類。這是key的第一次比較。
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>
在job中設置使用setPartitionerClasss
4、如何自定義key比較函數類。這是key的第二次比較。這是一個比較器,需要繼承WritableComparator。
public static class KeyComparator extends WritableComparator
必須有一個構造函數,並且重載 public int compare(WritableComparable w1, WritableComparable w2)
另一種方法是 實現接口RawComparator。
在job中設置使用setSortComparatorClass。
5、如何自定義分組函數類。
在reduce階段,構造一個key對應的value迭代器的時候,只要first相同就屬於同一個組,放在一個value迭代器。這是一個比較器,需要繼承WritableComparator。
public static class GroupingComparator extends WritableComparator
同key比較函數類,必須有一個構造函數,並且重載 public int compare(WritableComparable w1, WritableComparable w2)
同key比較函數類,分組函數類另一種方法是實現接口RawComparator。
在job中設置使用setGroupingComparatorClass。
6、案例分析
數據准備:
假如我們現在的需求是先按 cookieId 排序,然後按 time 排序,以便按 session 切分日志
cookieId time url
2 12:12:34 2_hao123
3 09:10:34 3_baidu
1 15:02:41 1_google
3 22:11:34 3_sougou
1 19:10:34 1_baidu
2 15:02:41 2_google
1 12:12:34 1_hao123
3 23:10:34 3_soso
2 05:02:41 2_google
結果:
------------------------------------------------
1 12:12:34 1_hao123
1 15:02:41 1_google
1 19:10:34 1_baidu
------------------------------------------------
2 05:02:41 2_google
2 12:12:34 2_hao123
2 15:02:41 2_google
------------------------------------------------
3 09:10:34 3_baidu
3 22:11:34 3_sougou
3 23:10:34 3_soso
案例2:對兩列數據進行排序,第一列相同的比較第二列
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
7、代碼實現
package SecondarySort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySort
{
//自己定義的key類應該實現WritableComparable接口
public static class IntPair implements WritableComparable<IntPair>
{
String first;
String second;
/**
* Set the left and right values.
*/
public void set(String left, String right)
{
first = left;
second = right;
}
public String getFirst()
{
return first;
}
public String getSecond()
{
return second;
}
//反序列化,從流中的二進制轉換成IntPair
public void readFields(DataInput in) throws IOException
{
first = in.readUTF();
second = in.readUTF();
}
//序列化,將IntPair轉化成使用流傳送的二進制
public void write(DataOutput out) throws IOException
{
out.writeUTF(first);
out.writeUTF(second);
}
//重載 compareTo 方法,進行組合鍵 key 的比較,該過程是默認行為。
//分組後的二次排序會隱式調用該方法。
public int compareTo(IntPair o)
{
if (!first.equals(o.first) )
{
return first.compareTo(o.first);
}
else if (!second.equals(o.second))
{
return second.compareTo(o.second);
}
else
{
return 0;
}
}
//新定義類應該重寫的兩個方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
{
return first.hashCode() * 157 + second.hashCode();
}
public boolean equals(Object right)
{
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair)
{
IntPair r = (IntPair) right;
return r.first.equals(first) && r.second.equals(second) ;
}
else
{
return false;
}
}
}
/**
* 分區函數類。根據first確定Partition。
*/
public static class FirstPartitioner extends Partitioner<IntPair, Text>
{
public int getPartition(IntPair key, Text value,int numPartitions)
{
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
}
}
/**
* 分組函數類。只要first相同就屬於同一個組。
*/
/*//第一種方法,實現接口RawComparator
public static class GroupingComparator implements RawComparator<IntPair> {
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
//一個字節一個字節的比,直到找到一個不相同的字節,然後比這個字節的大小作為兩個字節流的大小比較結果。
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
b2, s2, Integer.SIZE/8);
}
}*/
//第二種方法,繼承WritableComparator
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(IntPair.class, true);
}
//Compare two WritableComparables.
// 重載 compare:對組合鍵按第一個自然鍵排序分組
public int compare(WritableComparable w1, WritableComparable w2)
{
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
String l = ip1.getFirst();
String r = ip2.getFirst();
return l.compareTo(r);
}
}
// 自定義map
public static class Map extends Mapper<LongWritable, Text, IntPair, Text>
{
private final IntPair keyPair = new IntPair();
String[] lineArr = null;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
lineArr = line.split("\t", -1);
keyPair.set(lineArr[0], lineArr[1]);
context.write(keyPair, value);
}
}
// 自定義reduce
//
public static class Reduce extends Reducer<IntPair, Text, Text, Text>
{
private static final Text SEPARATOR = new Text("------------------------------------------------");
public void reduce(IntPair key, Iterable<Text> values,Context context) throws IOException, InterruptedException
{
context.write(SEPARATOR, null);
for (Text val : values)
{
context.write(null, val);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
// 讀取hadoop配置
Configuration conf = new Configuration();
// 實例化一道作業
Job job = new Job(conf, "secondarysort");
job.setJarByClass(SecondarySort.class);
// Mapper類型
job.setMapperClass(Map.class);
// 不再需要Combiner類型,因為Combiner的輸出類型<Text, IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用
//job.setCombinerClass(Reduce.class);
// Reducer類型
job.setReducerClass(Reduce.class);
// 分區函數
job.setPartitionerClass(FirstPartitioner.class);
// 分組函數
job.setGroupingComparatorClass(GroupingComparator.class);
// map 輸出Key的類型
job.setMapOutputKeyClass(IntPair.class);
// map輸出Value的類型
job.setMapOutputValueClass(Text.class);
// rduce輸出Key的類型,是Text,因為使用的OutputFormatClass是TextOutputFormat
job.setOutputKeyClass(Text.class);
// rduce輸出Value的類型
job.setOutputValueClass(Text.class);
// 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。
job.setInputFormatClass(TextInputFormat.class);
// 提供一個RecordWriter的實現,負責數據輸出。
job.setOutputFormatClass(TextOutputFormat.class);
// 輸入hdfs路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 輸出hdfs路徑
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hadoop2.5.2 新特性 http://www.linuxidc.com/Linux/2014-11/109814.htm
CentOS安裝和配置Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04上搭建Hadoop環境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu上搭建Hadoop環境(單機模式+偽分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu下Hadoop環境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
單機版搭建Hadoop環境圖文教程詳解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建Hadoop環境(在Winodws環境下用虛擬機虛擬兩個Ubuntu系統進行搭建) http://www.linuxidc.com/Linux/2011-12/48894.htm
更多Hadoop相關信息見Hadoop 專題頁面 http://www.linuxidc.com/topicnews.aspx?tid=13