每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。

hello tom
hello kittty
hello jerry
hello cat
hello tom
hello tom
hello kittty
hello jerry
hello cat
hello tom
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text,LongWritable >{
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text,LongWritable >.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String hang = value.toString();
String[] strings = hang.split(" ");
for(String string : strings) {
context.write(new Text(string),new LongWritable(1));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text,LongWritable >{
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text,LongWritable >.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String hang = value.toString();
String[] strings = hang.split(" ");
for(String string : strings) {
context.write(new Text(string),new LongWritable(1));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key2, Iterable<LongWritable> value2,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum=0;
for(LongWritable i :value2){
sum += i.get();
}
context.write(key2,new LongWritable(sum));
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text key2, Iterable<LongWritable> value2,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum=0;
for(LongWritable i :value2){
sum += i.get();
}
context.write(key2,new LongWritable(sum));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRClient {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration=new Configuration();
Job job=Job.getInstance(configuration);
//设置当前作业主函数所在类
job.setJarByClass(MRClient.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, "c:/data.txt");
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job,new Path("c:/out"));
job.setCombinerClass(WordCountReducer.class);
//提交作业,参数:true为显示计算过程,false不显示计算过程
job.waitForCompletion(true);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRClient {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration=new Configuration();
Job job=Job.getInstance(configuration);
//设置当前作业主函数所在类
job.setJarByClass(MRClient.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, "c:/data.txt");
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job,new Path("c:/out"));
job.setCombinerClass(WordCountReducer.class);
//提交作业,参数:true为显示计算过程,false不显示计算过程
job.waitForCompletion(true);
}
}
combiner功能只要在组装作业时,添加下面一行代码即可:// 设置 Combiner
job.setCombinerClass(WordCountReducer.class);
// 设置 Combiner
job.setCombinerClass(WordCountReducer.class);
combiner后统计结果是不会有变化的,但是可以从打印的日志看出 combiner的效果:combiner的打印日志:
加入 combiner后的打印日志如下:

这里我们只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 combiner 后,records 由 3519 降低为 6(样本中单词种类就只有 6 种),在这个用例中 combiner 就能极大地降低需要传输的数据量。
原文:https://www.cnblogs.com/TiePiHeTao/p/a709b1bf188ace34ae512fd3f01031c6.html