歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

HBase用一個MR同時寫入兩張表

以前在學習HBase的時候,也曾想過是否可以在一個MR中同時寫入兩個表,但以前在網上找的時候都找不到比較相關的答案,這兩天又重新找了下,居然有類似的實現,然後就自己參考著寫了下,基本可以運行(本文參考:http://www.wildnove.com/2011/07/19/tutorial-Hadoop-and-hbase-multitableoutputformat/),下面就詳細說下思路:

原始數據如下:

fansy,22,blog.csdu.net/fansy1990
tom,25,blog.csdu.net/tom1987
kate,23,blog.csdu.net/kate1989
jake,20,blog.csdu.net/jake1992
john,35,blog.csdu.net/john1977
ben,30,blog.csdu.net/ben1982

第一列代表name,dierlie代表age,disanlie代表webPage;要做的事情是把name和age存入表1,name和webPage存入表2;下面貼代碼:

ImportToHB.java:

package org.fansy.multipletables;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * write to multiple tables
 * @author fansy
 *
 */
public class ImportToHB extends Configured implements Tool{


 public static void main(String[] args) throws Exception {
  int exitCode = ToolRunner.run(new ImportToHB(), args);
        System.exit(exitCode);
 }

 @Override
 public int run(String[] args) throws Exception {
  if(args.length!=7){
   System.err.println("wrong args length:"+args.length);
  // System.out.println();   
   System.out.println("Usage: <input> <table1> <table1-fam> <table1-qua> "+
  "<table2> <table2-fam> <table2-qua>");
   System.exit(-1);
  }
  Configuration conf=new Configuration();
  conf.set("TABLE1", args[1]);
  conf.set("T1-FAM", args[2]);
  conf.set("T1-QUA", args[3]);
  conf.set("TABLE2", args[4]);
  conf.set("T2-FAM", args[5]);
  conf.set("T2-QUA", args[6]);
  Job job = new Job(conf);
        job.setJarByClass(ImportToHB.class);
        job.setMapperClass(MapperHB.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Writable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        job.setNumReduceTasks(0);

        if(job.waitForCompletion(true)){
         return 0;
        }
  return -1;
 }
}

MapperHB.java:

package org.fansy.multipletables;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperHB extends Mapper<LongWritable,Text,ImmutableBytesWritable,Writable>{
 private byte[] table1;
 private byte[] table2;
 private byte[] t1_fam;
 private byte[] t1_qua;
 private byte[] t2_fam;
 private byte[] t2_qua;
 
 public  void setup(Context context){
  table1=Bytes.toBytes(context.getConfiguration().get("TABLE1"));
  table2=Bytes.toBytes(context.getConfiguration().get("TABLE2"));
  t1_fam=Bytes.toBytes(context.getConfiguration().get("T1-FAM"));
  t1_qua=Bytes.toBytes(context.getConfiguration().get("T1-QUA"));
  t2_fam=Bytes.toBytes(context.getConfiguration().get("T2-FAM"));
  t2_qua=Bytes.toBytes(context.getConfiguration().get("T2-QUA"));
 }
 
 public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
  String[] info=value.toString().split(",");
  if(info.length!=3){
   return;
  }
  String name=info[0];
  String age=info[1];
  String webPage=info[2];
 
  // write to the first table row = name+age, value=age;
  ImmutableBytesWritable putTable = new ImmutableBytesWritable(table1);
  Put put = new Put(Bytes.toBytes(name+","+age));
  put.add(t1_fam,t1_qua,Bytes.toBytes(age));
  context.write(putTable, put);
 
  // write to the second table row=name+webPage,value=webPage
    putTable = new ImmutableBytesWritable(table2);
  put = new Put(Bytes.toBytes(name+","+webPage));
  put.add(t2_fam,t2_qua,Bytes.toBytes(webPage));
  context.write(putTable, put);
 }
}

上面的代碼只用了一個Mapper,同時寫入兩個HBase表中。這裡的要點是設置Mapper的輸出key和value的類型,按照上面的代碼類型為:ImmutableBytesWritable和Writable,而且在job的聲明處要設置輸出類型:job.setOutputFormatClass(MultiTableOutputFormat.class);

如何運行上面的程序?

(1)在HBase中創建兩張表:
create 'table1','info'
create 'table2','info'
(2)ImportToHB的輸入參數如下:
hdfs://master:9000/user/fansy/input/info.dat table1 info age table2 info webPage
(3)直接在eclipse中運行

運行後在HBase中察看輸出的數據如下:

Copyright © Linux教程網 All Rights Reserved