首页 > 其他 > 详细

Hadoop源码分析—— Job任务的程序入口

时间:2014-08-03 20:30:45      阅读:428      评论:0      收藏:0      [点我收藏+]

这篇文章大致介绍Hadoop Job的程序是如何启动的。

 

通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下:

public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args);
     System.exit(res);
}

可以看到这个Job任务的MapReduce实现类为CalculateSumJob,类的声明如下:

public class CalculateSumJob extends Configured implements Tool {
...
}

CalculateSumJob类继承了org.apache.hadoop.conf 包中的Configured类,而Configured类又实现了该包中的Configurable接口,主要是实现接口中的setConf和getConf方法,也就是设置或者读取Job任务的系统配置Configuration,Job的Configuration下一篇文章会详细讲。

同时CalculateSumJob类实现了org.apache.hadoop.util 包中的Tool接口,Tool类在继承Configurable接口的同时,自身也添加了run()方法,这个run()方法体现在main函数中的ToolRunner.run()这个语句。现在我们来看看这个ToolRunner是个什么东西,在org.apache.hadoop.util 包中找到ToolRunner类,实际上它就是Hadoop定义的一个类,专门用来run Hadoop Job的,这个类的核心代码如下:

  public static int run(Configuration conf, Tool tool, String[] args) 
    throws Exception{
    if(conf == null) {
      conf = new Configuration();
    }
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    //set the configuration back, so that Tool can configure itself
    tool.setConf(conf);
    
    //get the args w/o generic hadoop args
    String[] toolArgs = parser.getRemainingArgs();
    return tool.run(toolArgs);
  }

结合main函数中的ToolRunner.run(new Configuration(), new CalculateSumJob(),args)来看,run方法将传入的new Configuration()配置信息通过setConf方法赋给CalculateSumJob这个类,同时调用了CalculateSumJob的run方法,CalculateSumJob需要自己实现Tool接口中的run方法,代码如下:

 public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "CalculateSumJob");
    
        job.setJarByClass(CalculateSumJob.class);
    
        Configuration conf = job.getConfiguration();
        
        String inputPath = conf.get("input");
        String outputPath = conf.get("output");
        int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks"));
        job.setNumReduceTasks(NumReduceTasks);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
        job.setMapperClass(CalculateSumMapper.class);
        job.setCombinerClass(CalculateSumCombiner.class);
        job.setReducerClass(CalculateSumReducer.class);
    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

这个run方法定义了Job的基本参数,包括获取配置信息、设置jar、设置Job的输入输出路径、Job的Mapper,Combiner,Reducer类,输出的key/value类型等等。最后通过job.waitForCompletion(true)来等待执行Job任务。

 

这些具体的参数设置将在接下来的文章中详细介绍。

 

最后附上这篇文章的CalculateSumJob程序代码:

bubuko.com,布布扣
package jixi.source.hadoop.job;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CalculateSumJob extends Configured implements Tool {

    public static class CalculateSumMapper extends
        Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        Text outputKey = new Text();
        Text outputValue = new Text();

        String[] valueStr = value.toString().split("\t");

        outputKey.set(valueStr[0]);
        outputValue.set(valueStr[2]);
        context.write(outputKey, outputValue);

    }
    }

    public static class CalculateSumCombiner extends
        Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
        Text outputKey = new Text();
        Text outputValue = new Text();
        Double sum = new Double(0);

        for (Text value : values) {

        sum += Double.parseDouble(value.toString());

        }

        outputKey.set(key);
        outputValue.set(sum.toString());

        context.write(outputKey, outputValue);

    }

    }

    public static class CalculateSumReducer extends
        Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
        Text outputKey = new Text();
        Text outputValue = new Text();
        Double sum = new Double(0);

        for (Text value : values) {

        sum += Double.parseDouble(value.toString());

        }

        outputKey.set(key);
        outputValue.set(sum.toString()+"\t"+"null");

        context.write(outputKey, outputValue);

    }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "CalculateSumJob");
    
        job.setJarByClass(CalculateSumJob.class);
    
        Configuration conf = job.getConfiguration();
        
        String inputPath = conf.get("input");
        String outputPath = conf.get("output");
        int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks"));
        job.setNumReduceTasks(NumReduceTasks);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
        job.setMapperClass(CalculateSumMapper.class);
        job.setCombinerClass(CalculateSumCombiner.class);
        job.setReducerClass(CalculateSumReducer.class);
    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }
}
View Code

 

Hadoop源码分析—— Job任务的程序入口,布布扣,bubuko.com

Hadoop源码分析—— Job任务的程序入口

原文:http://www.cnblogs.com/jixi/p/3888772.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!