首页 > 编程语言 > 详细

MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分

时间:2016-10-10 23:12:29      阅读:209      评论:0      收藏:0      [点我收藏+]

1. map任务处理

1.3 对输出的key、value进行分区。

分区的目的指的是把相同分类的<k,v>交给同一个reducer任务处理。

 

public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{

		static HashMap<String,Integer> map = null;
		static{
			map = new HashMap<String,Integer>();
			map.put("gz1", 0);
			map.put("gz2", 0);
			map.put("sz1", 1);
			map.put("sz2", 1);
		}
		/**
		 * 这里是对mapper任务输出的<k2,v2>进行操作
		 * getPartition函数返回多少的值,就会有多少个reducer任务
		 * 
		 * “gz1”与“gz2”的返回的都是0,所以与分发到同一个reducer任务上,但是k2的值不一样
		 * 所以分组就是
		 * <gz1,123>
		 * <gz2,234>
		 * 然后出现在不同reduce函数上
		 */
		@Override
		public int getPartition(Text key, LongWritable value, int numPartitions) {
			
			return (Integer)map.get(key.toString()).intValue();
		}
		
}


//设置分区
        wcjob.setPartitionerClass(MyPartitioner.class);

 

自定义排序,排序是根据k2来进行排序的,k2就需要自己进行自定义类型

 private static class MyNewKey implements WritableComparable<MyNewKey> {
        long firstNum;
        long secondNum;

        public MyNewKey() {
        }

        public MyNewKey(long first, long second) {
            firstNum = first;
            secondNum = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }

        /*
         * 当key进行排序时会调用以下这个compreTo方法
         */
        @Override
        public int compareTo(MyNewKey anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 说明第一列不相等,则返回两数之间小的数
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }

 

 

自定义分组

为了针对新的key类型作分组,我们也需要自定义一下分组规则:

(1)编写一个新的分组比较类型用于我们的分组:

 private static class MyGroupingComparator implements
            RawComparator<MyNewKey> {

        /*
         * 基本分组规则:按第一列firstNum进行分组
         */
        @Override
        public int compare(MyNewKey key1, MyNewKey key2) {
            return (int) (key1.firstNum - key2.firstNum);
        }

        /*
         * @param b1 表示第一个参与比较的字节数组
         * 
         * @param s1 表示第一个参与比较的字节数组的起始位置
         * 
         * @param l1 表示第一个参与比较的字节数组的偏移量
         * 
         * @param b2 表示第二个参与比较的字节数组
         * 
         * @param s2 表示第二个参与比较的字节数组的起始位置
         * 
         * @param l2 表示第二个参与比较的字节数组的偏移量
         */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        }

    }

 从代码中我们可以知道,我们自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,下面看看这两个接口的定义:

  首先是RawComparator接口的定义:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

 其次是Comparator接口的定义:

public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

 

MyGroupingComparator中分别对这两个接口中的定义进行了实现,RawComparator中的compare()方法是基于字节的比较,Comparator中的compare()方法是基于对象的比较。

  在基于字节的比较方法中,有六个参数,一下子眼花了:

Params:

* @param arg0 表示第一个参与比较的字节数组
* @param arg1 表示第一个参与比较的字节数组的起始位置
* @param arg2 表示第一个参与比较的字节数组的偏移量

* @param arg3 表示第二个参与比较的字节数组
* @param arg4 表示第二个参与比较的字节数组的起始位置
* @param arg5 表示第二个参与比较的字节数组的偏移量

 

由于在MyNewKey中有两个long类型,每个long类型又占8个字节。这里因为比较的是第一列数字,所以读取的偏移量为8字节。

  (2)添加对分组规则的设置:

// 设置自定义分组规则
   job.setGroupingComparatorClass(MyGroupingComparator.class);

 

MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分

原文:http://www.cnblogs.com/feihaoItboy/p/5947596.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!