我们在wordcount小案例中看到结果是1个part-r-000000的文件,那如果我想对统计结果,
按照不同的条件输出到不同的文件(分区),那该如何处理呢?
我们梳理一下这个过程先
一个文本文件,上传到hdfs后以block块存储,split到切片,一个切片对应一个maptask任务,
一个maptask任务会对数据进行分区、归并和排序等操作,输出成一个临时文件(外部无序,内部有序),
一个分区对应一个reducetask,按相同的key为一组进行处理,最后输出到一个文件中。
我们是不是就可以在maptask的时候就进行分区操作,后边reducetask就会处理相同分区的数据
那hadoop是怎么实现这个呢?我们在代码里可以看到有个Partitioner的抽象类,可以看到代码逻辑:
/**
* 默认使用的是hashpartitoner分区,
* 逻辑就是按照key的hashcode对numReduceTasks取模得到分区号
**/
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
那我们是不是可以继承Partitioner来实现自己的业务逻辑呢?
具体步骤如下:
public class MyPartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//要返回的分区号
int partition = 0;
// maptask输出的key,也就是上个序列化例子里的手机号码
String phoneStr = text.toString();
//截取手机号码前3位,来区分不同的分区
String phoneNum = phoneStr.substring(0, 3);
if("131".equals(phoneNum)){
partition = 0;
}else if("132".equals(phoneNum)){
partition = 1;
}else if("133".equals(phoneNum)){
partition = 2;
}else if("134".equals(phoneNum)){
partition = 3;
}else {
partition = 4;
}
return partition;
}
}
public class FlowBeanDriver {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
//设置启动类
job.setJarByClass(FlowBeanDriver.class);
//设置mapper和reducer类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//设置mapper输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
==//2.在job驱动中,设置自定义Partitioner==
job.setPartitionerClass(MyPartitioner.class);
==//3.自定义Partition后,要根据自定义Partitoner的逻辑设置相应数量的ReduceTask==
job.setNumReduceTasks(5);
//设置最后reducer输出的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//输入输出路径信息
FileInputFormat.setInputPaths(job,new Path("/Users/waterair/Documents/shelltest/mr/demo4/input/demo1.txt"));
FileOutputFormat.setOutputPath(job,new Path("/Users/waterair/Documents/shelltest/mr/demo4/output1/"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
然后运行,发现我们的输出目录下会有多个part-r-00000x的文件信息,说明我们的分区的操作生效了。
原文:https://www.cnblogs.com/tenic/p/14683522.html