首页 > 其他 > 详细

hadoop之WordCount源代码分析

时间:2017-07-27 11:13:16      阅读:300      评论:0      收藏:0      [点我收藏+]
//近期在研究hadoop。第一个想要要開始研究的必然是wordcount程序了。看了《hadoop应用开发实战解说》结合自己的理解,对wordcount的源代码进行分析。
<pre name="code" class="java">

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class WordCount extends Configured implements Tool {
  
  /*
这个类实现mapper接口的map方法,输入的是文本总的每一行。

利用StringTokenizer将字符串拆分成单词。然后将输出结果(word, 1)写入到OutputCollector中去 OutputCollector有hadoop框架提供。负责收集mapper和reducer的输出数据,实现map函数和reduce函数时。仅仅须要将输出的<key,value>对向OutputCollector一丢就可以,其余的事情框架会自己处理。 */ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /*类中的LongWritable, Text, IntWritable是hadoop中实现的用于封装Java数据类型的类,这些类都可以被串行化从而便于在分布式系统中进行数据交换。可以将它们等同的视为long,string,int的替代品 */ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one);//输出结果(word,1) } } } /* 此类实现的是Reducer接口中的reduce方法。函数中的參数key.value是由mapper输出的中间结果。values是一个iterator(迭代器) */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; /* 遍历这个迭代器。就行得到有同样的key的全部的value值。 此处的key是一个单词。而value则是词频 */ while (values.hasNext()) { sum += values.next().get(); } //遍历后得到这个单词出现的总次数。 output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");//输入输入路径 ToolRunner.printGenericCommandUsage(System.out); return -1; } /* Wordcount 中map/reduce项目的主要驱动程序,调用此方法提交的map / reduce任务。在hadoop中一次计算任务成为一个job。可以通过以一个JobConf对象设置怎样执行这个job。此处定义了输出的key 类型是text,而value的类型是IntWritable */ public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); // key是text(words) conf.setOutputKeyClass(Text.class); // value是IntWritable (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); List<String> other_args = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { /* ToolRunner的run方法開始,run方法有三个參数。

第一个是Configuration类的实例,第二个是wordcount的实例,args则是从控制台接收到的命令行数组 */ int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }




   

hadoop之WordCount源代码分析

原文:http://www.cnblogs.com/tlnshuju/p/7243616.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!