之前一直用Hadoop streaming方式,-file功能非常實用,可以動態上傳文件,例如一些配置文件等。之後開始尋找java程序中的-file功能,費了很大功夫,一直沒有測試通過。
後來發現GenericOptionsParser能解析一些特有命令參數,並且做相應處理,例如:遇到-files參數時,將文件上傳到mapper節點。經過測試,-files命令參數必須在hadoop jar後緊接著,這個可以通過streaming來查看使用規范,如下:
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/hadoop-streaming.jar [options]
Options:
-input <path> DFS input file(s) for the Map step
-output <path> DFS output directory for the Reduce step
-mapper <cmd|JavaClassName> The streaming command to run
-combiner <cmd|JavaClassName> The streaming command to run
-reducer <cmd|JavaClassName> The streaming command to run
-file <file> File/dir to be shipped in the Job jar file.
Deprecated. Use generic option "-files" instead
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
-outputformat TextOutputFormat(default)|JavaClassName Optional.
-partitioner JavaClassName Optional.
-numReduceTasks <num> Optional.
-inputreader <spec> Optional.
-cmdenv <n>=<v> Optional. Pass env.var to streaming commands
-mapdebug <path> Optional. To run this script when a map task fails
-reducedebug <path> Optional. To run this script when a reduce task fails
-io <identifier> Optional.
-lazyOutput Optional. Lazily create Output
-verbose
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
hadoop 執行java程序也需要遵循該命令參數規范,特別是-D -libjars -files等參數。
測試代碼:
package wordcount.com.cn;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
@SuppressWarnings("deprecation")
public class WordCount {
static class SimpleMapper extends Mapper<LongWritable,Text,Text,Text>
{
BufferedReader reader = null;
List<String> lines = new ArrayList<String>(); //簡單測試,沒有任何業務邏輯
public void setup(Context context) throws IOException
{
FileReader fr = new FileReader("test_upload_file"); //必須和上傳文件名一致
reader = new BufferedReader(fr);
String line = null;
while((line = reader.readLine()) != null)
lines.add(line);
System.out.println(lines);
}
@Override
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException
{
for(String line:lines)
context.write(new Text("key"),new Text(line));
}
}
static class SimpleReducer extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key, Iterable<Text> values,, Context context)throws IOException, InterruptedException
{
for(Text value: values)
{
context.write(key, value);
}
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
for (String s:otherArgs)
System.out.println(s);
if (otherArgs.length != 2) {
System.err.println("Usage: Wordcount -files test_upload_file input output");
System.exit(2);
}
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setNumReduceTasks(0);
job.setMapperClass(SimpleMapper.class);
job.setReducerClass(SimpleReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
執行測試:
hadoop jar WordCount.jar -files test_upload_file /user/lmc/tmp/input /user/lmc/tmp/output
測試通過,告捷!