歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Hadoop Java程序-files功能測試

之前一直用Hadoop streaming方式,-file功能非常實用,可以動態上傳文件,例如一些配置文件等。之後開始尋找java程序中的-file功能,費了很大功夫,一直沒有測試通過。

後來發現GenericOptionsParser能解析一些特有命令參數,並且做相應處理,例如:遇到-files參數時,將文件上傳到mapper節點。經過測試,-files命令參數必須在hadoop jar後緊接著,這個可以通過streaming來查看使用規范,如下:

Usage: $HADOOP_HOME/bin/hadoop jar \
          $HADOOP_HOME/hadoop-streaming.jar [options]
Options:
  -input    <path>    DFS input file(s) for the Map step
  -output  <path>    DFS output directory for the Reduce step
  -mapper  <cmd|JavaClassName>      The streaming command to run
  -combiner <cmd|JavaClassName> The streaming command to run
  -reducer  <cmd|JavaClassName>      The streaming command to run
  -file    <file>    File/dir to be shipped in the Job jar file.
 Deprecated. Use generic option "-files" instead
  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
  -outputformat TextOutputFormat(default)|JavaClassName  Optional.
  -partitioner JavaClassName  Optional.
  -numReduceTasks <num>  Optional.
  -inputreader <spec>  Optional.
  -cmdenv  <n>=<v>    Optional. Pass env.var to streaming commands
  -mapdebug <path>  Optional. To run this script when a map task fails
  -reducedebug <path>  Optional. To run this script when a reduce task fails
  -io <identifier>  Optional.
  -lazyOutput Optional. Lazily create Output
  -verbose


Generic options supported are
-conf <configuration file>    specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|jobtracker:port>    specify a job tracker
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

hadoop 執行java程序也需要遵循該命令參數規范,特別是-D -libjars -files等參數。

測試代碼:

package wordcount.com.cn; 
 
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
 
@SuppressWarnings("deprecation") 
public class WordCount { 
     
     
     
    static class SimpleMapper extends Mapper<LongWritable,Text,Text,Text> 
    { 
        BufferedReader reader = null; 
        List<String> lines = new ArrayList<String>(); //簡單測試,沒有任何業務邏輯 
         
        public void setup(Context context) throws IOException 
        { 
            FileReader fr = new FileReader("test_upload_file");  //必須和上傳文件名一致 
            reader = new BufferedReader(fr); 
             
            String line = null; 
            while((line = reader.readLine()) != null) 
                lines.add(line); 
            System.out.println(lines); 
        } 
        @Override 
        public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException 
        { 
            for(String line:lines) 
                context.write(new Text("key"),new Text(line)); 
        } 
    } 
     
    static class SimpleReducer extends Reducer<Text,Text,Text,Text> 
    { 
        public void reduce(Text key, Iterable<Text> values,, Context context)throws IOException, InterruptedException 
        { 
            for(Text value: values) 
                    { 
                        context.write(key, value); 
                    } 
        } 
    } 
     
 
    /**
    * @param args
    * @throws IOException 
    * @throws InterruptedException 
    * @throws ClassNotFoundException 
    */ 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
        // TODO Auto-generated method stub 
        Configuration conf = new Configuration(); 
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
        for (String s:otherArgs) 
            System.out.println(s); 
        if (otherArgs.length != 2) { 
          System.err.println("Usage: Wordcount -files test_upload_file input output"); 
          System.exit(2); 
        } 
         
        Job job = new Job(conf); 
        job.setJarByClass(WordCount.class); 
         
        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
 
        job.setNumReduceTasks(0); 
        job.setMapperClass(SimpleMapper.class); 
        job.setReducerClass(SimpleReducer.class); 
         
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
         
        System.exit(job.waitForCompletion(true)? 0: 1); 
       
    } 
 

執行測試:

hadoop jar WordCount.jar -files test_upload_file  /user/lmc/tmp/input /user/lmc/tmp/output

測試通過,告捷!

Copyright © Linux教程網 All Rights Reserved