








1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)期望输出数据手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
代码如下:
package partiton;
import flow.FlowBean;
import flow.FlowMapper;
import flow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class partitonDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job实例
Job job=Job.getInstance(new Configuration());
//2、设置类路径
job.setJarByClass(partitonDriver.class);
//3、设置Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartitioner.class);
//4、设置输入输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//5、设置输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//6、进行提交
boolean b=job.waitForCompletion(true);
System.exit(b ? 0:1);
}
}
package partiton;
import flow.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, FlowBean> {
//返回分区号
public int getPartition(Text text, FlowBean flowBean, int i) {
String phone=text.toString();
switch (phone.substring(0,3)){
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
成功运行之后
并存储为了文件。显然已经了分区操作
排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个Reduce Task。但该方法在
处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序: (GroupingCompan tor分组)
在Redre端对key进行分组。应用于:在接收的ke y为bean对象时,想让-个或几个字段相同(全部
字段比较不相同)的hkey进入 到同-个reduce方法时,可以采用分组排序。
(4)二次排序.
在自定义排序过程中,如果compare To中的判断条件为两个即为二次排序。
package writablecomparable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable ,Comparable<FlowBean>{
private long upFlow;
private long downFlow;
private long sumFlow;
//准备一个空参构造器
public FlowBean()
{}
public void set(long upFlow,long downFlow)
{
this.downFlow=downFlow;
this.upFlow=upFlow;
this.sumFlow=upFlow+downFlow;
}
@Override
public String toString()
{
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
//序列化方法
//提供数据的出口
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
//反序列化方法
//框架提供的数据来源
public void readFields(DataInput dataInput) throws IOException {
upFlow=dataInput.readLong();
downFlow=dataInput.readLong();
sumFlow=dataInput.readLong();
}
@Override
public int compareTo(FlowBean o) {
return Long.compare(o.sumFlow,this.sumFlow);
}
//这两个方法里面的内容顺序要一样uds,
}
package writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text value:values)
{
context.write(value,key);
}
}
}
package writablecomparable;
import com.sun.tools.javac.comp.Flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job =Job.getInstance(new Configuration());
job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path("D:\\wev"));
FileOutputFormat.setOutputPath(job,new Path("D:\\wev"));
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
package writablecomparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable,Text,FlowBean, Text> {
private FlowBean flow=new FlowBean();
private Text phone =new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fieds=value.toString().split("\t");
phone.set(fieds[0]);
flow.setUpFlow(Long.parseLong(fieds[1]));
flow.setDownFlow(Long.parseLong(fieds[2]));
flow.setSumFlow(Long.parseLong(fieds[3]));
context.write(flow,phone);
}
}
运行结果显示已经按照流量排序而完成:

原文:https://www.cnblogs.com/dazhi151/p/13526120.html