首页 > 其他 > 详细

从WordCount看MapReduce框架执行流程

时间:2014-10-30 12:11:31      阅读:388      评论:0      收藏:0      [点我收藏+]
代码如下:

import?java.io.IOException;

import?java.util.StringTokenizer;

?

import?org.apache.hadoop.conf.*;

import?org.apache.hadoop.mapreduce.*;

import?org.apache.hadoop.util.*;

import?org.apache.hadoop.io.*;

import?org.apache.hadoop.mapreduce.Reducer.*;

import?org.apache.hadoop.mapreduce.lib.input.*;

import?org.apache.hadoop.mapreduce.lib.output.*;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.mapreduce.lib.partition.*;

?

public?class?WordCount?{

public?static?class?MyMapper?extends?Mapper<Object,Text,?Text,IntWritable>{

private?final?static?IntWritable?one?=?new?IntWritable(1);

private?Text?word?=?new?Text();

?

public?void?map(Object?key,?Text?value,Context?context)?throws?IOException,InterruptedException{

StringTokenizer?st=new?StringTokenizer(value.toString());

while(st.hasMoreTokens()){

String?str=st.nextToken();

word.set(str);

context.write(word,?one);

System.out.println(str+"="+one.get());

}

}

}

?

public?static?class?MyReducer?extends?Reducer<Text,IntWritable,Text,IntWritable>{

private?IntWritable?result?=?new?IntWritable();

public?void?reduce(Text?key,Iterable<IntWritable>?values?,Context?context)?throws?IOException,InterruptedException{

int?sum=0;

for(IntWritable?val:values)

{

sum+=val.get();

}

System.out.println(key+"="+sum);

result.set(sum);

context.write(key,result);

}

?

}

?

?

public?static?void?main(String?[]?args)?throws?Exception

{

Configuration?conf=?new?Configuration();

?String[]?otherArgs?=?new?GenericOptionsParser(conf,?args).getRemainingArgs();

????if?(otherArgs.length?!=?2)?{

??????System.err.println("Usage:?wordcount?<in>?<out>");

??????System.exit(2);

????}

???Job?job=new?Job(conf,"wordcount"?);

???/**InputFormat类的作用是将输入的数据分割成splits,并将splits进一步拆成<K,V>

???*可以通过setInputFormatClass()方法进行设置

???*默认为TextInputFormat.class,默认情况可以不写

???**/

???job.setInputFormatClass(TextInputFormat.class);

???/**

????*Mapper类的作用是实现map函数,将splits作为输入生成一个结果

????*可以通过setMapperClass()方法进行设置

????*默认为Mapper.class,默认情况可以不写,此时输入即输出

????*/

???job.setMapperClass(MyMapper.class);

???/**

????*?设置Mapper输出的key的类型

????*/

???job.setMapOutputKeyClass(Text.class);

???/**

????*?设置Mapper输出的value的类型

????*/

???job.setMapOutputValueClass(IntWritable.class);

???/**

????*?Combiner类的作用是实现combine函数,将mapper的输出作为输入,合并具有形同key值得键值对

????*?可以通过setCombinerClass()方法进行设置

????*?默认为null,默认情况不写,此时输入即输出

????*/

???job.setCombinerClass(MyReducer.class);

???/**

????*?Partitioner类的作用是实现getPartition函数,用于在洗牌过程中将由Mapper输入的结果分成R份,每份交给一个Reducer

????*?可以通过setPartitionerClass()方法进行设置

????*?默认为HashPartitioner.class,默认情况可以不写,此时输入即输出

????*/

???job.setPartitionerClass(HashPartitioner.class);

???/**

????*?Reducer类的作用是实现reduce函数,将有combiner的输出作为输入,得到最终结果

????*?可以通过setReducerClass()方法进行设置

????*?默认为Reducer.class,默认情况可以不写,此时输入即输出

????*/

???job.setReducerClass(MyReducer.class);

???/**

????*?OutputFormat类,负责输出最终结果

????*?可以通过setOutputFormatClass()方法进行设置

????*?默认TextOutputFormat.class,默认情况可以不写,此时输入即输出

????*/

???//job.setOutputFormatClass(TextOutputFormat.class);

???/**

????*?设置reduce()输出的key的类型

????*/

???job.setOutputKeyClass(Text.class);

???/**

????*?设置reduce()输出的value的类型

????*/

???job.setOutputValueClass(IntWritable.class);

???

????FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

????FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

????System.out.println("运行啦");

????System.exit(job.waitForCompletion(true)???0?:?1);

?

}

}

我们先来看main函数,以便了解hadoopMapReduce的过程,假设输入为两个文件


bubuko.com,布布扣
?

?

?

?

InputFormat类将输入文件划分成多个splits,同时将这些splits转化为<K,V>的形式,如下,可以发现,当使用默认的TextInputFormat进行处理的时候,K为偏移量,V为每行的文本。



bubuko.com,布布扣
?
??

接着map()方法

?

对以上结果进行处理,根据MyMapper中的map方法,得到以下结果:



bubuko.com,布布扣
?

?

接下来Mapper框架对以上结果进行处理,根据key值进行排序,并合并成集合,得到以下结果:


bubuko.com,布布扣
?

?

?

接下来combine类对以上结果进行处理(实际上combine是一个本地的reducer,所以用MyReducer给他复制,见文章最后)得到结果如下:


bubuko.com,布布扣
?

?

?

接下来Reducer框架对其进行处理,结果如下:

bubuko.com,布布扣
?
?
?
?

?

?

?

?

?

接下来reduce()方法进行处理,结果如下:



bubuko.com,布布扣
?
?
?

?

?

?

?

?

以上是wordcount程序执行的全过程,通过wordcount的代码,我们了解了MapReduce框架的执行流程,如下

InputFormat>>map()方法>>Mapper框架>>Combiner>>Partitioner>>Reducer框架>>reduce()方法

以上的每个步骤,不设置具体的类时都会有个默认的类,除了InputFormat类以外,其他类的默认类都是输入即输出,但是InputFormat的默认类是输出为<K,V>

因此,对于wordcount程序来说,如果都采用默认类的话,输出应该为????



bubuko.com,布布扣
?
???????????????????????

?

?

?

?

下面来说说Combiner类,本质上是一个本地的Reducer类。其设计初衷是在本地将需要reduce操作的数据合并,以减少不必要的通信代价,以提高hadoop的运行性能。?

但值得注意的是,并不是所有的mapreduce程序都可以将reduce过程移植到本地进行combine,这需要在逻辑上考虑这种移植是否可行!要想?进行本地reducecombine),一个必要的条件是,reduce的输入输出格式必须一样!比如,对于wordcount程序,combine是?可行的,combine的输出(实际上是reduce的输出)与reduce的输入吻合。因此我们才可以有?job.setCombinerClass(MyReducer.class);

?

?

?

从WordCount看MapReduce框架执行流程

原文:http://yizhenn.iteye.com/blog/2149401

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!