(学习网易云课堂Hadoop大数据实战笔记)




import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class TrafficWritable implements Writable {
long upPackNum, downPackNum,upPayLoad,downPayLoad;
public TrafficWritable() { //这个构造函数不能省,否则报错
super();
// TODO Auto-generated constructor stub
}
public TrafficWritable(String upPackNum, String downPackNum, String upPayLoad,
String downPayLoad) {
super();
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
@Override
public void write(DataOutput out) throws IOException { //序列化
// TODO Auto-generated method stub
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException { //反序列化
// TODO Auto-generated method stub
this.upPackNum=in.readLong();
this.downPackNum=in.readLong();
this.upPayLoad=in.readLong();
this.downPayLoad=in.readLong();
}
@Override
public String toString() { //不加toStirng函数,最后输出内存的地址
return upPackNum + "\t"+ downPackNum + "\t" + upPayLoad + "\t"
+ downPayLoad;
}
}
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TrafficCount {
/**
* @author nwpulisz
* @date 2016.3.31
*/
static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";
static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";
public static void main(String[] args) throws Throwable {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Path outPut_path= new Path(OUTPUT_PATH);
Job job = new Job(conf, "TrafficCount");
//如果输出路径是存在的,则提前删除输出路径
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
if(fileSystem.exists(outPut_path))
{
fileSystem.delete(outPut_path,true);
}
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job, outPut_path);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TrafficWritable.class);
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
protected void map(LongWritable k1, Text v1,
Context context) throws IOException, InterruptedException {
String[] splits = v1.toString().split("\t");
Text k2 = new Text(splits[1]);
TrafficWritable v2 = new TrafficWritable(splits[6], splits[7],
splits[8], splits[9]);
context.write(k2, v2);
}
}
static class MyReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{
protected void reduce(Text k2, Iterable<TrafficWritable> v2s, Context context
) throws IOException, InterruptedException {
long upPackNum=0L, downPackNum=0L,upPayLoad=0L,downPayLoad=0L;
for(TrafficWritable traffic: v2s) {
upPackNum += traffic.upPackNum;
downPackNum += traffic.downPackNum;
upPayLoad += traffic.upPayLoad;
downPayLoad += traffic.downPayLoad;
}
context.write(k2,new TrafficWritable(upPackNum+"",downPackNum+"",upPayLoad+"",
downPayLoad+""));
}
}
}

原文:http://www.cnblogs.com/nwpulisz/p/5346127.html