测试hadoop版本:2.4
Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中。
使用的好处:可以大大减小网络数据的传输量,提高效率;
一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据。
实例:
在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数;
package fz.inmap.aggregation; import java.io.IOException; import java.util.ArrayList; import java.util.PriorityQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class InMapArrgegationDriver extends Configured implements Tool{ public static Logger log = LoggerFactory.getLogger(InMapArrgegationDriver.class); /** * @throws Exception * */ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new InMapArrgegationDriver(),args); } @Override public int run(String[] arg0) throws Exception { if(arg0.length!=3){ System.err.println("Usage:\nfz.inmap.aggregation.InMapArrgegationDriver <in> <out> <maxNum>"); return -1; } Configuration conf = getConf(); // System.out.println(conf.get("fs.defaultFS")); Path in = new Path(arg0[0]); Path out= new Path(arg0[1]); out.getFileSystem(conf).delete(out, true); conf.set("maxResult", arg0[2]); Job job = Job.getInstance(conf,"in map arrgegation job"); job.setJarByClass(getClass()); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(InMapMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(VectorWritable.class); job.setNumReduceTasks(0); // System.out.println(job.getConfiguration().get("mapreduce.job.reduces")); // System.out.println(conf.get("mapreduce.job.reduces")); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); return job.waitForCompletion(true)?0:-1; } protected static class InMapMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ private ArrayList<Word> words = new ArrayList<Word>(); private PriorityQueue<Word> queue; private int maxResult; protected void setup(Context cxt){ maxResult = cxt.getConfiguration().getInt("maxResult", 10); } protected void map(LongWritable key, Text value,Context cxt){ String [] line = value.toString().split(" "); // use blank to split for(String word:line){ Word curr = new Word(word,1); if(words.contains(curr)){ // increase the exists word‘s frequency for(Word w:words){ if(w.equals(curr)){ w.frequency++; break; } } }else{ words.add(curr); } } } protected void cleanup(Context cxt) throws InterruptedException,IOException{ Text outputKey = new Text(); IntWritable outputValue = new IntWritable(); queue = new PriorityQueue<Word>(words.size()); queue.addAll(words); for(int i=0;i< maxResult;i++){ Word tail = queue.poll(); if(tail!=null){ outputKey.set(tail.value); outputValue.set(tail.frequency); log.info("key is {},value is {}", outputKey,outputValue); cxt.write(outputKey, outputValue); } } } } }
package fz.inmap.aggregation; public class Word implements Comparable<Word>{ public String value; public int frequency; public Word(String value,int frequency){ this.value=value; this.frequency=frequency; } @Override public int compareTo(Word o) { return o.frequency-this.frequency; } @Override public boolean equals(Object obj){ if(obj instanceof Word){ return value.equalsIgnoreCase(((Word)obj).value); }else{ return false; } } }
或者查看输出结果:
总结:使用map端聚合,虽然可以大大减小网络数据传输量,提高效率,但是我们在应用的时候还是需要考虑实际的应用环境。比如,如果使用上面的算法来计算最大单词频率的前10个,然后还是使用上面的代码,就会有问题。每个mapper会处理并输出自己的单词词频最大的10个单词,并没有考虑到所有数据,这样在reducer端整合的时候就会可能会忽略部分数据,造成最终结果的错误。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
hadoop编程小技巧(1)---map端聚合,布布扣,bubuko.com
原文:http://blog.csdn.net/fansy1990/article/details/37880665