歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

MapReduce的一對多連接操作

問題描述:
一個trade table表
product1"trade1
product2"trade2
product3"trade3
一個pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6

建立兩個表之間的連接,該兩表是一對多關系的
如下:
trade1pay1
trade1pay4
trade2pay2
...

思路:

為了將兩個表整合到一起,由於有相同的第一列,且第一個表與第二個表是一對多關系的。
這裡依然采用分組,以及組內排序,只要保證一方最先到達reduce端,則就可以進行迭代處理了。
為了保證第一個表先到達reduce端,可以為定義一個組合鍵,包含兩個值,第一個值為product,第二個值為0或者1,來分別代表第一個表和第二個表,只要按照組內升序排列即可。

具體代碼:

自定義組合鍵策略

package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
    //product1 0/1
    private String firstKey;//product1
    private int secondKey;//0,1;0代表是trade表,1代表是pay表
    //只需要保證trade表在pay表前面就行,則只需要對組順序排列
                                                           
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
                                                           
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextIntPair tip=(TextIntPair)o;
        return this.getFirstKey().compareTo(tip.getFirstKey());
    }
}

分組策略

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
    protected TextComparator() {
        super(TextIntPair.class,true);//注冊比較器
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair tip1=(TextIntPair)a;
        TextIntPair tip2=(TextIntPair)b;
        return tip1.getFirstKey().compareTo(tip2.getFirstKey());
    }
}

組內排序策略:目的是保證第一個表比第二個表先到達
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分組內部進行排序,按照第二個字段進行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextIntPair.class,true);
    }
    //這裡可以進行排序的方式管理
    //必須保證是同一個分組的
    //a與b進行比較
    //如果a在前b在後,則會產生升序
    //如果a在後b在前,則會產生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair ti1=(TextIntPair)a;
        TextIntPair ti2=(TextIntPair)b;
        //首先要保證是同一個組內,同一個組的標識就是第一個字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
          return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
          return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
    }
                                     
}

分區策略:

package whut.onetomany;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionByText extends Partitioner<TextIntPair, Text> {
    @Override
    public int getPartition(TextIntPair key, Text value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

Copyright © Linux教程網 All Rights Reserved