当写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输
而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。
操作:
自定义bean来封装处理后的信息,可以自定义排序规则用bean中的某几个属性来作为排序的依据
代码节段:
自定义的bean实现WritableComparable接口,并对compareTo的实现
//先按照年龄排序,再按性别(年龄小,sex大的在前)
@Override
public int compareTo(Person o) {
if(o.age==this.age){
if(o.sex==this.sex){
return 0;
}else{
return o.sex-this.sex;
}
}else{
return this.age-o.age;
}
}
注意: 1.hadoop开发了一套自己的序列化和反序列化策略(Writable),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化。
2.如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。
Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。
操作:
在job对象中,设job.setPartitionerClass(自定义分区类.class)
节选代码:
public class CustomPartitioner extends Partitioner<Text,Text>{
/*
* numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。
* 假如job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer hash = numMap.get(key.toString().substring(0, 1));
//将没有匹配到的数据放入3号分区
return hash==null?3:hash;
}
}
假设把bean作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。
1.自定义分组要继承WritableComparator,然后重写compare方法。
2.定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);
代码节选
public class CustomGroupingComparator extends WritableComparator{
protected CustomGroupingComparator() {
super(Person.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean abean = (Bean) a;
Bean bbean = (Bean) b;
//将item_id相同的bean都视为相同,从而聚合为一组
return abean.getAge()-bbean.getAge();
}
}
每个分区内都会调用job.setSortComparatorClass()设置的key比较函数类排序;
如果没有通过job.setSortComparatorClass()设置key比较函数类,则使用key的实现的compareTo方法
原文:https://www.cnblogs.com/zyanrong/p/10884239.html