歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

如何通過Java程序提交yarn的MapReduce計算任務

由於項目需求,需要通過Java程序提交Yarn的MapReduce的計算任務。與一般的通過Jar包提交MapReduce任務不同,通過程序提交MapReduce任務需要有點小變動,詳見以下代碼。

以下為MapReduce主程序,有幾點需要提一下:

1、在程序中,我將文件讀入格式設定為WholeFileInputFormat,即不對文件進行切分。

2、為了控制reduce的處理過程,map的輸出鍵的格式為組合鍵格式。與常規的<key,value>不同,這裡變為了<TextPair,Value>,TextPair的格式為<key1,key2>。

3、為了適應組合鍵,重新設定了分組函數,即GroupComparator。分組規則為,只要TextPair中的key1相同(不要求key2相同),則數據被分配到一個reduce容器中。這樣,當相同key1的數據進入reduce容器後,key2起到了一個數據標識的作用。

package web.Hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import util.Utils;

public class GEMIMain {
 
 public GEMIMain(){
  job = null;
 }
 
 public Job job;
 public static class NamePartitioner extends
   Partitioner<TextPair, BytesWritable> {
  @Override
  public int getPartition(TextPair key, BytesWritable value,
    int numPartitions) {
   return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
  }
 }

 /**
  * 分組設置類,只要兩個TextPair的第一個key相同,他們就屬於同一組。他們的Value就放到一個Value迭代器中,
  * 然後進入Reducer的reduce方法中。
  *
  * @author hduser
  *
  */
 public static class GroupComparator extends WritableComparator {
  public GroupComparator() {
   super(TextPair.class, true);
  }

  @Override
  public int compare(WritableComparable a, WritableComparable b) {
   TextPair t1 = (TextPair) a;
   TextPair t2 = (TextPair) b;
   // 比較相同則返回0,比較不同則返回-1
   return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一個字段相同的就分成為同一組
  }
 }
 
 
 public  boolean runJob(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
 
  Configuration conf = new Configuration();
  // 在conf中設置outputath變量,以在reduce函數中可以獲取到該參數的值
  conf.set("outputPath", args[args.length - 1].toString());
  //設置HDFS中,每次任務生成產品的質量文件所在文件夾。args數組的倒數第二個原數為質量文件所在文件夾
  conf.set("qualityFolder", args[args.length - 2].toString());
  //如果在Server中運行,則需要獲取web項目的根路徑;如果以java應用方式調試,則讀取/opt/hadoop-2.5.0/etc/hadoop/目錄下的配置文件
  //MapReduceProgress mprogress = new MapReduceProgress();
  //String rootPath= mprogress.rootPath;
  String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
  conf.addResource(new Path(rootPath+"yarn-site.xml"));
  conf.addResource(new Path(rootPath+"core-site.xml"));
  conf.addResource(new Path(rootPath+"hdfs-site.xml"));
  conf.addResource(new Path(rootPath+"mapred-site.xml"));
  this.job = new Job(conf);
 
  job.setJobName("Job name:" + args[0]);
  job.setJarByClass(GEMIMain.class);

  job.setMapperClass(GEMIMapper.class);
  job.setMapOutputKeyClass(TextPair.class);
  job.setMapOutputValueClass(BytesWritable.class);
  // 設置partition
  job.setPartitionerClass(NamePartitioner.class);
  // 在分區之後按照指定的條件分組
  job.setGroupingComparatorClass(GroupComparator.class);

  job.setReducerClass(GEMIReducer.class);

  job.setInputFormatClass(WholeFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  // job.setOutputKeyClass(NullWritable.class);
  // job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(8);
 
 
  // 設置計算輸入數據的路徑
  for (int i = 1; i < args.length - 2; i++) {
   FileInputFormat.addInputPath(job, new Path(args[i]));
  }
  // args數組的最後一個元素為輸出路徑
  FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
  boolean flag = job.waitForCompletion(true);
  return flag;
 }
 
 @SuppressWarnings("static-access")
 public static void main(String[] args) throws ClassNotFoundException,
   IOException, InterruptedException { 
 
  String[] inputPaths = new String[] { "normalizeJob",
    "hdfs://192.168.168.101:9000/user/hduser/red1/",
    "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
    "hdfs://192.168.168.101:9000/user/hduser/test" };
  GEMIMain test = new GEMIMain();
  boolean result = test.runJob(inputPaths);       
 }
}

以下為TextPair類

public class TextPair implements WritableComparable<TextPair> {
 private Text first;
 private Text second;

 public TextPair() {
  set(new Text(), new Text());
 }

 public TextPair(String first, String second) {
  set(new Text(first), new Text(second));
 }

 public TextPair(Text first, Text second) {
  set(first, second);
 }

 public void set(Text first, Text second) {
  this.first = first;
  this.second = second;
 }

 public Text getFirst() {
  return first;
 }

 public Text getSecond() {
  return second;
 }

 @Override
 public void write(DataOutput out) throws IOException {
  first.write(out);
  second.write(out);
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  first.readFields(in);
  second.readFields(in);
 }

 @Override
 public int hashCode() {
  return first.hashCode() * 163 + second.hashCode();
 }

 @Override
 public boolean equals(Object o) {
  if (o instanceof TextPair) {
   TextPair tp = (TextPair) o;
   return first.equals(tp.first) && second.equals(tp.second);
  }
  return false;
 }

 @Override
 public String toString() {
  return first + "\t" + second;
 }
 
 @Override
 /**A.compareTo(B)
  * 如果比較相同,則比較結果為0
  * 如果A大於B,則比較結果為1
  * 如果A小於B,則比較結果為-1
  *
  */
 public int compareTo(TextPair tp) {
  int cmp = first.compareTo(tp.first);
  if (cmp != 0) {
   return cmp;
  }
  //此時實現的是升序排列
  return second.compareTo(tp.second);
 }
}

以下為WholeFileInputFormat,其控制數據在mapreduce過程中不被切分

package web.hadoop;

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { 
 
    @Override 
    public RecordReader<Text, BytesWritable> createRecordReader( 
            InputSplit arg0, TaskAttemptContext arg1) throws IOException, 
            InterruptedException { 
        // TODO Auto-generated method stub 
        return new WholeFileRecordReader(); 
    } 
 
    @Override 
    protected boolean isSplitable(JobContext context, Path filename) { 
        // TODO Auto-generated method stub 
        return false; 
    } 

以下為WholeFileRecordReader類

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

 private FileSplit fileSplit;
 private FSDataInputStream fis;

 private Text key = null;
 private BytesWritable value = null;

 private boolean processed = false;

 @Override
 public void close() throws IOException {
  // TODO Auto-generated method stub
  // fis.close();
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  // TODO Auto-generated method stub
  return this.key;
 }

 @Override
 public BytesWritable getCurrentValue() throws IOException,
   InterruptedException {
  // TODO Auto-generated method stub
  return this.value;
 }

 @Override
 public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
   throws IOException, InterruptedException {

  fileSplit = (FileSplit) inputSplit;
  Configuration job = tacontext.getConfiguration();
  Path file = fileSplit.getPath();
  FileSystem fs = file.getFileSystem(job);
  fis = fs.open(file);
 }

 @Override
 public boolean nextKeyValue() {

  if (key == null) {
   key = new Text();
  }

  if (value == null) {
   value = new BytesWritable();
  }

  if (!processed) {
   byte[] content = new byte[(int) fileSplit.getLength()];

   Path file = fileSplit.getPath();

   System.out.println(file.getName());
   key.set(file.getName());

   try {
    IOUtils.readFully(fis, content, 0, content.length);
    // value.set(content, 0, content.length);
    value.set(new BytesWritable(content));
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } finally {
    IOUtils.closeStream(fis);
   }

   processed = true;
   return true;
  }

  return false;
 }

 @Override
 public float getProgress() throws IOException, InterruptedException {
  // TODO Auto-generated method stub
  return processed ? fileSplit.getLength() : 0;
 }

}

Spark 顛覆 MapReduce 保持的排序記錄  http://www.linuxidc.com/Linux/2014-10/107909.htm

在 Oracle 數據庫中實現 MapReduce  http://www.linuxidc.com/Linux/2014-10/107602.htm

MapReduce實現矩陣乘法--實現代碼 http://www.linuxidc.com/Linux/2014-09/106958.htm

基於MapReduce的圖算法 PDF  http://www.linuxidc.com/Linux/2014-08/105692.htm

Hadoop的HDFS和MapReduce  http://www.linuxidc.com/Linux/2014-08/105661.htm

MapReduce 計數器簡介 http://www.linuxidc.com/Linux/2014-08/105649.htm

Hadoop技術內幕:深入解析MapReduce架構設計與實現原理 PDF高清掃描版 http://www.linuxidc.com/Linux/2014-06/103576.htm

Copyright © Linux教程網 All Rights Reserved