3、流量监控汇总(使用LongWritable实现)
hdfs文件路径:/tmp/flow.txt 查看文件内容: 13770759991 50 100 25 400 13770759991 800 600 500 100 13770759992 400 300 250 1400 13770759992 800 1200 600 900
字符串含义:
号码 上行 下行 上传 下载
phoneNum uppackBytes downpackBytes uploadBytes downloadBytes
代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowTest {
public static void main(String[] args) {
// TODO Auto-generated method stub
Path fromPath = new Path(args[0]);
Path toPath = new Path(args[1]);
try {
Configuration conf = new Configuration();
Job job = Job.getInstance();;
job.setJarByClass(FlowTest.class);
FileInputFormat.addInputPath(job, fromPath);
FileOutputFormat.setOutputPath(job, toPath);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
try {
job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/*
号码 上行 下行 上传 下载
phoneNum uppackBytes downpackBytes uploadBytes downloadBytes
13770759991 50L 100L 25L 400L
13770759991 800L 600L 500L 100L
13770759992 400L 300L 250L 1400L
13770759992 800L 1200L 600L 900L
*/
class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] line = value.toString().split("\\W+");
String phoneNum = line[0];
long uppackBytes = Long.parseLong(line[1]);
long downpackBytes = Long.parseLong(line[2]);
long uploadBytes = Long.parseLong(line[3]);
long downloadBytes = Long.parseLong(line[4]);
context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));
}
}
class FlowReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
long sumUppack = 0L;
long sumDownpack = 0L;
long sumUpload = 0L;
long sumDownload = 0L;
for(Text t : text){
String[] line = t.toString().split("-");
sumUppack += Long.parseLong(line[0].toString());
sumDownpack += Long.parseLong(line[1].toString());
sumUpload += Long.parseLong(line[2].toString());
sumDownload += Long.parseLong(line[3].toString());
}
context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );
}
}
输出:
导出成flow.jar并上传至服务器的/opt目录 执行: hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out" 再执行: hadoop fs -ls /tmp/flow/out/* 查看输出的文件:
4、流量监控汇总(使用自定义的writable类NetflowWritable实现)
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NetflowTest { public static void main(String[] args) { // TODO Auto-generated method stub Path fromPath = new Path(args[0]); Path toPath = new Path(args[1]); try { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(NetflowTest.class); FileInputFormat.addInputPath(job, fromPath); FileOutputFormat.setOutputPath(job, toPath); job.setMapperClass(NetflowMapper.class); job.setReducerClass(NetflowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NetflowWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NetflowWritable.class); try { job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class NetflowWritable implements Writable{ private long uppackBytes; private long downpackBytes; private long uploadBytes; private long downloadBytes; //创建一个无参的构造方法,不加的话会执行报错 public NetflowWritable(){} public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) { //this.phoneNum=phoneNum; this.uppackBytes = uppackBytes; this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes; this.downloadBytes = downloadBytes; } public long getUppackBytes() { return uppackBytes; } public long getDownpackBytes() { return downpackBytes; } public long getUploadBytes() { return uploadBytes; } public long getDownloadBytes() { return downloadBytes; } public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) { this.uppackBytes = uppackBytes; this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes; this.downloadBytes = downloadBytes; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.uppackBytes = in.readLong(); this.downpackBytes = in.readLong(); this.uploadBytes = in.readLong(); this.downloadBytes = in.readLong(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(uppackBytes); out.writeLong(downpackBytes); out.writeLong(uploadBytes); out.writeLong(downloadBytes); } @Override //重写toString方法 public String toString() { // TODO Auto-generated method stub return "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ; } } class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{ private String phoneNum; private long uppackBytes; private long downpackBytes; private long uploadBytes; private long downloadBytes; NetflowWritable nf = new NetflowWritable(); //Text text = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] line = value.toString().split("\\t"); phoneNum = line[0]; uppackBytes = Long.parseLong(line[1]); downpackBytes = Long.parseLong(line[2]); uploadBytes = Long.parseLong(line[3]); downloadBytes = Long.parseLong(line[4]); nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes); context.write(new Text(phoneNum), nf); } } class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{ private NetflowWritable nf; @Override protected void reduce(Text arg0, Iterable<NetflowWritable> arg1, Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long uppackBytes = 0L; long downpackBytes = 0L; long uploadBytes = 0L; long downloadBytes = 0L; for(NetflowWritable nw : arg1){ uppackBytes += nw.getUppackBytes(); downpackBytes += nw.getDownpackBytes(); uploadBytes += nw.getUploadBytes(); downloadBytes += nw.getDownloadBytes(); } nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes); context.write(arg0, nf); } }
输出:
导出成netflow.jar并上传至服务器的/opt目录 执行: hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out" 再执行: hadoop fs -ls /tmp/netflow/out/* 查看输出的文件:
原文:http://www.cnblogs.com/cangos/p/6422144.html