首页 > 其他 > 详细

hadoop编程小技巧(9)---二次排序(值排序)

时间:2014-07-30 17:29:34      阅读:331      评论:0      收藏:0      [点我收藏+]

代码测试环境:Hadoop2.4

应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧。

应用实例描述:

比如针对下面的数据:

a,5
b,7
c,2
c,9
a,3
a,1
b,10
b,3
c,1
如果使用一般的MR的话,其输出可能是这样的:

a	1
a	3
a	5
b	3
b	10
b	7
c	1
c	9
c	2
从数据中可以看到其键是排序的,但是其值不是。通过此篇介绍的技巧可以做到下面的输出:

a	1
a	3
a	5
b	3
b	7
b	10
c	1
c	2
c	9
这个数据就是键和值都是排序的了。

二次排序原理:
1)自定义键类型,把值放入键中;

2)利用键的排序特性,可以顺便把值也排序了;

3)这时会有两个问题:

a. 数据传输到不同的Reducer会有异常;

b. 数据在Reducer中的分组不同;

针对这两个问题,需要使用自定义Partitioner、使用自定义GroupComparator来定义相应的逻辑;

实例:

driver类:

package fz.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SortDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new SortDriver(),args);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job1 = Job.getInstance(getConf(), "secondary sort ");
		job1.setJarByClass(getClass());
		if(args.length!=5){
			System.err.println("Usage: <input> <output> <numReducers> <useSecondarySort> <useGroupComparator>");
			System.exit(-1);
		}
		Path out = new Path(args[1]);
		out.getFileSystem(getConf()).delete(out, true);
		FileInputFormat.setInputPaths(job1, new Path(args[0]));
		FileOutputFormat.setOutputPath(job1, out);
		if("true".equals(args[3])||"false".equals(args[3])){
			if("true".equals(args[3])){ // 使用二次排序
				job1.setMapperClass(Mapper1.class);
				job1.setReducerClass(Reducer1.class);
				job1.setMapOutputKeyClass(CustomKey.class);
				job1.setMapOutputValueClass(NullWritable.class);
				job1.setOutputKeyClass(CustomKey.class);
				job1.setOutputValueClass(NullWritable.class);
				if("true".equals(args[4])){
					job1.setGroupingComparatorClass(CustomGroupComparator.class);
				}else if("false".equals(args[4])){
					// do nothing 
				}else{
					System.err.println("Wrong Group Comparator argument!");
					System.exit(-1);
				}
				job1.setPartitionerClass(CustomPartitioner.class);
			}else{  // 不使用二次排序
				job1.setMapperClass(Mapper2.class);
				job1.setReducerClass(Reducer2.class);
				job1.setMapOutputKeyClass(Text.class);
				job1.setMapOutputValueClass(IntWritable.class);
				job1.setOutputKeyClass(Text.class);
				job1.setOutputValueClass(IntWritable.class);
			}
		}else{
			System.err.println("The fourth argument should be ‘true‘ or ‘false‘");
			System.exit(-1);
		}
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);
		
		job1.setNumReduceTasks(Integer.parseInt(args[2]));
		
		boolean job1success = job1.waitForCompletion(true);
		if(!job1success) {
			System.out.println("The CreateBloomFilter job failed!");
			return -1;
		}
		return 0;
	}
	

}

mapper1(二次排序mapper):

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 有二次排序mapper
 * @author fansy
 *
 */
public class Mapper1 extends
		Mapper<LongWritable, Text, CustomKey, NullWritable> {
	private String COMMA =",";
	private CustomKey newKey = new CustomKey();
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		newKey.setSymbol(values[0]);
		newKey.setValue(Integer.parseInt(values[1]));
		cxt.write(newKey, NullWritable.get());
	}
}
Reducer1(二次排序Reducer)

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 二次排序Reducer
 * @author fansy
 *
 */
public class Reducer1 extends
		Reducer<CustomKey, NullWritable, CustomKey, NullWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer1.class);
	public void setup(Context cxt){
		log.info("reducer1*********************in setup()");
	}
	public void reduce(CustomKey key ,Iterable<NullWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer1******* in reduce()");
		for(NullWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer1****** in reduce() *******end");
	}
}
无排序mapper2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 不是二次排序
 * @author fansy
 *
 */
public class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
	private String COMMA =",";
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		Text newKey =  new Text(values[0]);
		cxt.write(newKey, new IntWritable(Integer.parseInt(values[1])));
	}
}

无排序Reducer2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 无二次排序
 * @author fansy
 *
 */
public class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer2.class);
	public void setup(Context cxt){
		log.info("reducer2*********************in setup()");
	}
	public void reduce(Text key ,Iterable<IntWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer2******* in reduce()");
		for(IntWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer2****** in reduce() *******end");
	}
}

自定义key

package fz.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * symbol 是原始的key
 * value是原始的值
 * @author fansy
 *
 */
public class CustomKey implements WritableComparable<CustomKey> {

	private int value;
	private String symbol;
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(this.value);
		out.writeUTF(this.symbol);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.value=in.readInt();
		this.symbol= in.readUTF();
	}

	@Override
	public int compareTo(CustomKey o) {
		int result = this.symbol.compareTo(o.symbol);
		return result!=0 ? result :this.value-o.value;
	}

	@Override
	public String toString(){
		return this.symbol+"\t"+this.value;
	}
	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

}
自定义GroupComparator

package fz.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 只对比symbol,即原始的键
 * @author fansy
 *
 */
public class CustomGroupComparator extends WritableComparator {

	protected CustomGroupComparator(){
		super(CustomKey.class,true);
	}
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable a,WritableComparable b){
		CustomKey ak = (CustomKey) a;
		CustomKey bk = (CustomKey) b;
		return ak.getSymbol().compareTo(bk.getSymbol());
	}
}

自定义Partitioner

package fz.secondarysort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 保持原始分组条件
 * @author fansy
 *
 * @param <K1>
 * @param <V1>
 */
public class CustomPartitioner<K1, V1> extends Partitioner<K1, V1> {

	@Override
	public int getPartition(K1 key, V1 value, int numPartitions) {
		CustomKey keyK= (CustomKey) key;
		Text tmpValue =new  Text(keyK.getSymbol());
		return (tmpValue.hashCode() & Integer.MAX_VALUE)%numPartitions;
	}

}


结果查看:

不使用二次排序

bubuko.com,布布扣

使用二次排序,同时使用自定义组分类器:

bubuko.com,布布扣
可以看到不管二次排序和非二次排序,在Reducer端都只有三个分组;同时二次排序的其值也是排序的;

如果在二次排序中不使用组分类器,那么会得到下面的结果:

bubuko.com,布布扣

从这个结果可以看到有大于3个分组,这样的结果可能是有问题的(对于键是a的分组 整合不了数据);

同时上面使用了自定义的Partitioner,这里看不出区别是因为只有一个Reducer,如果有多个就可以看出差别。


总结:使用二次排序可以对不单单是键排序,同时可以对值进行排序,即在Reducer每个组中接收的value值是排序的,这样在某些操作中是可以增加效率的。


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990



hadoop编程小技巧(9)---二次排序(值排序),布布扣,bubuko.com

hadoop编程小技巧(9)---二次排序(值排序)

原文:http://blog.csdn.net/fansy1990/article/details/38302615

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!