这篇文章大致介绍Hadoop Job的程序是如何启动的。
通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下:
public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);
}
可以看到这个Job任务的MapReduce实现类为CalculateSumJob,类的声明如下:
public class CalculateSumJob extends Configured implements Tool { ... }
CalculateSumJob类继承了org.apache.hadoop.conf 包中的Configured类,而Configured类又实现了该包中的Configurable接口,主要是实现接口中的setConf和getConf方法,也就是设置或者读取Job任务的系统配置Configuration,Job的Configuration下一篇文章会详细讲。
同时CalculateSumJob类实现了org.apache.hadoop.util 包中的Tool接口,Tool类在继承Configurable接口的同时,自身也添加了run()方法,这个run()方法体现在main函数中的ToolRunner.run()这个语句。现在我们来看看这个ToolRunner是个什么东西,在org.apache.hadoop.util 包中找到ToolRunner类,实际上它就是Hadoop定义的一个类,专门用来run Hadoop Job的,这个类的核心代码如下:
public static int run(Configuration conf, Tool tool, String[] args) throws Exception{ if(conf == null) { conf = new Configuration(); } GenericOptionsParser parser = new GenericOptionsParser(conf, args); //set the configuration back, so that Tool can configure itself tool.setConf(conf); //get the args w/o generic hadoop args String[] toolArgs = parser.getRemainingArgs(); return tool.run(toolArgs); }
结合main函数中的ToolRunner.run(new Configuration(), new CalculateSumJob(),args)来看,run方法将传入的new Configuration()配置信息通过setConf方法赋给CalculateSumJob这个类,同时调用了CalculateSumJob的run方法,CalculateSumJob需要自己实现Tool接口中的run方法,代码如下:
public int run(String[] args) throws Exception { Job job = new Job(getConf(), "CalculateSumJob"); job.setJarByClass(CalculateSumJob.class); Configuration conf = job.getConfiguration(); String inputPath = conf.get("input"); String outputPath = conf.get("output"); int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks")); job.setNumReduceTasks(NumReduceTasks); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setMapperClass(CalculateSumMapper.class); job.setCombinerClass(CalculateSumCombiner.class); job.setReducerClass(CalculateSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; }
这个run方法定义了Job的基本参数,包括获取配置信息、设置jar、设置Job的输入输出路径、Job的Mapper,Combiner,Reducer类,输出的key/value类型等等。最后通过job.waitForCompletion(true)来等待执行Job任务。
这些具体的参数设置将在接下来的文章中详细介绍。
最后附上这篇文章的CalculateSumJob程序代码:
package jixi.source.hadoop.job; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CalculateSumJob extends Configured implements Tool { public static class CalculateSumMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text outputKey = new Text(); Text outputValue = new Text(); String[] valueStr = value.toString().split("\t"); outputKey.set(valueStr[0]); outputValue.set(valueStr[2]); context.write(outputKey, outputValue); } } public static class CalculateSumCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text outputKey = new Text(); Text outputValue = new Text(); Double sum = new Double(0); for (Text value : values) { sum += Double.parseDouble(value.toString()); } outputKey.set(key); outputValue.set(sum.toString()); context.write(outputKey, outputValue); } } public static class CalculateSumReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text outputKey = new Text(); Text outputValue = new Text(); Double sum = new Double(0); for (Text value : values) { sum += Double.parseDouble(value.toString()); } outputKey.set(key); outputValue.set(sum.toString()+"\t"+"null"); context.write(outputKey, outputValue); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } public int run(String[] args) throws Exception { Job job = new Job(getConf(), "CalculateSumJob"); job.setJarByClass(CalculateSumJob.class); Configuration conf = job.getConfiguration(); String inputPath = conf.get("input"); String outputPath = conf.get("output"); int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks")); job.setNumReduceTasks(NumReduceTasks); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setMapperClass(CalculateSumMapper.class); job.setCombinerClass(CalculateSumCombiner.class); job.setReducerClass(CalculateSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } }
Hadoop源码分析—— Job任务的程序入口,布布扣,bubuko.com
原文:http://www.cnblogs.com/jixi/p/3888772.html