去除日志中字段长度小于等于11的日志。
(1)编写LogMapper
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行数据 String line = value.toString(); // 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 设置key k.set(line); // 5 写出数据 context.write(k, NullWritable.get()); } // 2 解析日志 private boolean parseLog(String line, Context context) { // 1 截取 String[] fields = line.split(" ");
// 2 日志长度大于11的为合法 if (fields.length > 11) { // 系统计数器 context.getCounter("map", "true").increment(1); return true; }else { context.getCounter("map", "false").increment(1); return false; } } } |
(2)编写LogDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception { args = new String[] { "e:/inputlog", "e:/output1" }; // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(LogDriver.class); // 3 关联map job.setMapperClass(LogMapper.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } } |
对web访问日志中的各字段识别切分
去除日志中不合法的记录
根据统计需求,生成各类访问请求过滤数据
(1)定义一个bean,用来记录日志数据中的各数据字段
public class LogBean {
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
private boolean valid = true;// 判断数据是否合法
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);
return sb.toString();
}
}
(2)编写LogMapper程序
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取1行
String line = value.toString();
// 2 解析日志是否合法
LogBean bean = pressLog(line);
if (!bean.isValid()) {
return;
}
k.set(bean.toString());
// 3 输出
context.write(k, NullWritable.get());
}
// 解析日志
private LogBean pressLog(String line) {
LogBean logBean = new LogBean();
// 1 截取
String[] fields = line.split(" ");
if (fields.length > 11) {
// 2封装数据
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);
if (fields.length > 12) {
logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
}else {
logBean.setHttp_user_agent(fields[11]);
}
// 大于400,HTTP错误
if (Integer.parseInt(logBean.getStatus()) >= 400) {
logBean.setValid(false);
}
}else {
logBean.setValid(false);
}
return logBean;
}
}
(3)编写LogDriver程序
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver { public static void main(String[] args) throws Exception { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(LogDriver.class); // 3 关联map job.setMapperClass(LogMapper.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } } |
CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据。
测试一下如下压缩方式:
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
public class TestCompress {
public static void main(String[] args) throws Exception, IOException {
// compress("e:/test.txt","org.apache.hadoop.io.compress.BZip2Codec");
decompress("e:/test.txt.bz2");
}
/*
* 压缩
* filername:要压缩文件的路径
* method:欲使用的压缩的方法(org.apache.hadoop.io.compress.BZip2Codec)
*/
public static void compress(String filername, String method) throws ClassNotFoundException, IOException {
// 1 创建压缩文件路径的输入流
File fileIn = new File(filername);
InputStream in = new FileInputStream(fileIn);
// 2 获取压缩的方式的类
Class codecClass = Class.forName(method);
Configuration conf = new Configuration();
// 3 通过名称找到对应的编码/解码器
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
// 4 该压缩方法对应的文件扩展名
File fileOut = new File(filername + codec.getDefaultExtension());
OutputStream out = new FileOutputStream(fileOut);
CompressionOutputStream cout = codec.createOutputStream(out);
// 5 流对接
IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 缓冲区设为5MB
// 6 关闭资源
in.close();
cout.close();
out.close();
}
/*
* 解压缩
* filename:希望解压的文件路径
*/
public static void decompres(String filename) throws FileNotFoundException, IOException {
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// 1 获取文件的压缩方法
CompressionCodec codec = factory.getCodec(new Path(filename));
// 2 判断该压缩方法是否存在
if (null == codec) {
System.out.println("Cannot find codec for file " + filename);
return;
}
// 3 创建压缩文件的输入流
InputStream cin = codec.createInputStream(new FileInputStream(filename));
// 4 创建解压缩文件的输出流
File fout = new File(filename + ".decoded");
OutputStream out = new FileOutputStream(fout);
// 5 流对接
IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false);
// 6 关闭资源
cin.close();
out.close();
}
}
即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置:
给大家提供的hadoop源码支持的压缩格式有:BZip2Codec 、DefaultCodec
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 1 : 0);
}
}
2)Mapper保持不变
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
3)Reducer保持不变
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
基于workcount案例处理
1)修改驱动
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
boolean result = job.waitForCompletion(true);
System.exit(result?1:0);
}
}
日志比对
压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在Hadoop下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下,I/O操作和网络数据传输要花大量的时间。还有,Shuffle与Merge过程同样也面临着巨大的I/O压力。
鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。
如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。
压缩Mapreduce的一种优化策略:通过压缩编码对Mapper或者Reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)。
注意:压缩特性运用得当能提高性能,但运用不当也可能降低性能。
基本原则:
(1)运算密集型的job,少用压缩
(2)IO密集型的job,多用压缩
https://blog.csdn.net/linuxnc/article/details/44499231
压缩格式 |
hadoop自带? |
算法 |
文件扩展名 |
是否可切分 |
换成压缩格式后,原来的程序是否需要修改 |
DEFAULT |
是,直接使用 |
DEFAULT |
.deflate |
否 |
和文本处理一样,不需要修改 |
Gzip |
是,直接使用 |
DEFAULT |
.gz |
否 |
和文本处理一样,不需要修改 |
bzip2 |
是,直接使用 |
bzip2 |
.bz2 |
是 |
和文本处理一样,不需要修改 |
LZO |
否,需要安装 |
LZO |
.lzo |
是 |
需要建索引,还需要指定输入格式 |
Snappy |
否,需要安装 |
Snappy |
.snappy |
否 |
和文本处理一样,不需要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示
压缩格式 |
对应的编码/解码器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较
压缩算法 |
原始文件大小 |
压缩文件大小 |
压缩速度 |
解压速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
在64位模式的核心i7处理器的单个核心上,Snappy以大约250MB/sec或更高的速度压缩,以大约500MB/sec或更高的速度解压缩。
优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分linux系统都自带gzip命令,使用方便。
缺点:不支持split。
应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式。例如说一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。
优点:高速压缩速度和合理的压缩率。
缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;
应用场景:当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。
压缩可以在MapReduce作用的任意阶段启用。
要在Hadoop中启用压缩,可以配置如下参数:
参数 |
默认值 |
阶段 |
建议 |
io.compression.codecs (在core-site.xml中配置) |
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec
|
输入压缩 |
Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) |
false |
mapper输出 |
这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) |
org.apache.hadoop.io.compress.DefaultCodec |
mapper输出 |
使用LZO或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) |
false |
reducer输出 |
这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) |
org.apache.hadoop.io.compress. DefaultCodec |
reducer输出 |
使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) |
RECORD |
reducer输出 |
SequenceFile输出使用的压缩类型:NONE和BLOCK |
基本原则: (1)运算密集型的job,少用压缩 (2)IO密集型的job,多用压缩
|
// 开启map端输出压缩 configuration.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
|
// 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); |
以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
先求出A、B、C、….等是谁的好友
第一次输出结果
(1)第一次Mapper
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 1 获取一行 A:B,C,D,F,E,O String line = value.toString();
// 2 切割 String[] fileds = line.split(":");
// 3 获取person和好友 String person = fileds[0]; String[] friends = fileds[1].split(",");
// 4写出去 for(String friend: friends){ // 输出 <好友,人> context.write(new Text(friend), new Text(person)); } } } |
(2)第一次Reducer
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer(); //1 拼接 for(Text person: values){ sb.append(person).append(","); }
//2 写出 context.write(key, new Text(sb.toString())); } } |
(3)第一次Driver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OneShareFriendsDriver {
public static void main(String[] args) throws Exception { // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
// 2 指定jar包运行的路径 job.setJarByClass(OneShareFriendsDriver.class);
// 3 指定map/reduce使用的类 job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class);
// 4 指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
// 6 指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 boolean result = job.waitForCompletion(true);
System.exit(result?1:0); } } |
(4)第二次Mapper
import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // A I,K,C,B,G,F,H,O,D, // 友 人,人,人 String line = value.toString(); String[] friend_persons = line.split("\t");
String friend = friend_persons[0]; String[] persons = friend_persons[1].split(",");
Arrays.sort(persons);
for (int i = 0; i < persons.length - 1; i++) {
for (int j = i + 1; j < persons.length; j++) { // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去 context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend)); } } } } |
(5)第二次Reducer
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text friend : values) { sb.append(friend).append(" "); }
context.write(key, new Text(sb.toString())); } } |
(5)第二次Driver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoShareFriendsDriver {
public static void main(String[] args) throws Exception { // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
// 2 指定jar包运行的路径 job.setJarByClass(TwoShareFriendsDriver.class);
// 3 指定map/reduce使用的类 job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class);
// 4 指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
// 6 指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 boolean result = job.waitForCompletion(true);
System.exit(result?1:0); } } |
两次Job串联:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class AllShareFriendsReducer { public static void main(String[] args) throws IOException { args = new String[]{"F:\\date\\A\\friends.txt","F:\\date\\A\\FA1","F:\\date\\A\\FA11"};
Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf);
job1.setMapperClass(OneShareFriendsMapper.class); job1.setReducerClass(OneShareFriendsReducer.class);
job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1]));
Job job2 = Job.getInstance(conf);
job2.setMapperClass(TwoShareFriendsMapper.class); job2.setReducerClass(TwoShareFriendsReducer.class);
job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2]));
JobControl control = new JobControl("Andy"); ControlledJob ajob = new ControlledJob(job1.getConfiguration()); ControlledJob bjob = new ControlledJob(job2.getConfiguration()); bjob.addDependingJob(ajob); control.addJob(ajob); control.addJob(bjob); Thread thread = new Thread(control); thread.start(); } } |
有大量的文本(文档、网页),需要建立搜索索引
(1)第一次预期输出结果
xyg--a.txt 3
xyg--b.txt 2
xyg--c.txt 2
pingping--a.txt 1
pingping--b.txt 3
pingping--c.txt 1
ss--a.txt 2
ss--b.txt 1
ss--c.txt 1
(2)第二次预期输出结果
xyg c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
(1)第一次处理,编写OneIndexMapper
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取切片名称 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String name = inputSplit.getPath().getName();
// 2 获取1行 String line = value.toString();
// 3 截取 String[] words = line.split(" ");
// 4 把每个单词和切片名称关联起来 for (String word : words) { k.set(word + "--" + name);
context.write(k, new IntWritable(1)); } } } |
(2)第一次处理,编写OneIndexReducer
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0; // 累加和 for(IntWritable value: values){ count +=value.get(); }
// 写出 context.write(key, new IntWritable(count)); } }
|
(3)第一次处理,编写OneIndexDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OneIndexDriver {
public static void main(String[] args) throws Exception {
args = new String[] { "e:/inputoneindex", "e:/output5" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf); job.setJarByClass(OneIndexDriver.class);
job.setMapperClass(OneIndexMapper.class); job.setReducerClass(OneIndexReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true); } } |
(4)查看第一次输出结果
xyg--a.txt 3
xyg--b.txt 2
xyg--c.txt 2
pingping--a.txt 1
pingping--b.txt 3
pingping--c.txt 1
ss--a.txt 2
ss--b.txt 1
ss--c.txt 1
(1)第二次处理,编写TwoIndexMapper
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{ Text k = new Text(); Text v = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行数据 String line = value.toString();
// 2用“--”切割 String[] fields = line.split("--");
k.set(fields[0]); v.set(fields[1]);
// 3 输出数据 context.write(k, v); } } |
(2)第二次处理,编写TwoIndexReducer
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // xyg a.txt 3 // xyg b.txt 2 // xyg c.txt 2
// xyg c.txt-->2 b.txt-->2 a.txt-->3
StringBuilder sb = new StringBuilder();
for (Text value : values) { sb.append(value.toString().replace("\t", "-->") + "\t"); }
context.write(key, new Text(sb.toString())); } } |
(3)第二次处理,编写TwoIndexDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoIndexDriver {
public static void main(String[] args) throws Exception {
args = new String[] { "e:/inputtwoindex", "e:/output6" };
Configuration config = new Configuration(); Job job = Job.getInstance(config);
job.setMapperClass(TwoIndexMapper.class); job.setReducerClass(TwoIndexReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
(4)第二次查看最终结果
xyg c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现
以下有两个 MapReduce 任务,分别是 Flow 的 SumMR 和 SortMR,其中有依赖关系:SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后
Configuration conf1 = new Configuration();
Configuration conf2 = new Configuration();
Job job1 = Job.getInstance(conf1);
Job job2 = Job.getInstance(conf2);
job1.setJarByClass(MRScore3.class);
job1.setMapperClass(MRMapper3_1.class);
//job.setReducerClass(ScoreReducer3.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(StudentBean.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(StudentBean.class);
job1.setPartitionerClass(CoursePartitioner2.class);
job1.setNumReduceTasks(4);
Path inputPath = new Path("D:\\MR\\hw\\work3\\input");
Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
FileInputFormat.setInputPaths(job1, inputPath);
FileOutputFormat.setOutputPath(job1, outputPath);
job2.setMapperClass(MRMapper3_2.class);
job2.setReducerClass(MRReducer3_2.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(StudentBean.class);
job2.setOutputKeyClass(StudentBean.class);
job2.setOutputValueClass(NullWritable.class);
Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_end");
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);
JobControl control = new JobControl("Score3");
ControlledJob aJob = new ControlledJob(job1.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
// 设置作业依赖关系
bJob.addDependingJob(aJob);
control.addJob(aJob);
control.addJob(bJob);
Thread thread = new Thread(control);
thread.start();
while(!control.allFinished()) {
thread.sleep(1000);
}
System.exit(0);
计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。
MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。
MapReduce 自带了许多默认Counter,现在我们来分析这些默认 Counter 的含义,方便大家观察 Job 结果,如输入的字节数、输出的字节数、Map端输入/输出的字节数和条数、Reduce端的输入/输出的字节数和条数等。下面我们只需了解这些内置计数器,知道计数器组名称(groupName)和计数器名称(counterName),以后使用计数器会查找groupName和counterName即可。
在任务执行过程中,任务计数器采集任务的相关信息,每个作业的所有任务的结果会被聚集起来。例如,MAP_INPUT_RECORDS 计数器统计每个map任务输入记录的总数,并在一个作业的所有map任务上进行聚集,使得最终数字是整个作业的所有输入记录的总数。任务计数器由其关联任务维护,并定期发送给TaskTracker,再由TaskTracker发送给 JobTracker。因此,计数器能够被全局地聚集。下面我们分别了解各种任务计数器。
1)MapReduce 任务计数器
MapReduce 任务计数器的 groupName为org.apache.hadoop.mapreduce.TaskCounter,它包含的计数器如下表所示
计数器名称 |
说明 |
map 输入的记录数(MAP_INPUT_RECORDS) |
作业中所有 map 已处理的输入记录数。每次 RecorderReader 读到一条记录并将其传给 map 的 map() 函数时,该计数器的值增加。 |
map 跳过的记录数(MAP_SKIPPED_RECORDS) |
作业中所有 map 跳过的输入记录数。 |
map 输入的字节数(MAP_INPUT_BYTES) |
作业中所有 map 已处理的未经压缩的输入数据的字节数。每次 RecorderReader 读到一条记录并 将其传给 map 的 map() 函数时,该计数器的值增加 |
分片split的原始字节数(SPLIT_RAW_BYTES) |
由 map 读取的输入-分片对象的字节数。这些对象描述分片元数据(文件的位移和长度),而不是分片的数据自身,因此总规模是小的 |
map 输出的记录数(MAP_OUTPUT_RECORDS) |
作业中所有 map 产生的 map 输出记录数。每次某一个 map 的Context 调用 write() 方法时,该计数器的值增加 |
map 输出的字节数(MAP_OUTPUT_BYTES) |
作业中所有 map 产生的 未经压缩的输出数据的字节数。每次某一个 map 的 Context 调用 write() 方法时,该计数器的值增加。 |
map 输出的物化字节数(MAP_OUTPUT_MATERIALIZED_BYTES) |
map 输出后确实写到磁盘上的字节数;若 map 输出压缩功能被启用,则会在计数器值上反映出来 |
combine 输入的记录数(COMBINE_INPUT_RECORDS) |
作业中所有 Combiner(如果有)已处理的输入记录数。Combiner 的迭代器每次读一个值,该计数器的值增加。 |
combine 输出的记录数(COMBINE_OUTPUT_RECORDS) |
作业中所有 Combiner(如果有)已产生的输出记录数。每当一个 Combiner 的 Context 调用 write() 方法时,该计数器的值增加。 |
reduce 输入的组(REDUCE_INPUT_GROUPS) |
作业中所有 reducer 已经处理的不同的码分组的个数。每当某一个 reducer 的 reduce() 被调用时,该计数器的值增加。 |
reduce 输入的记录数(REDUCE_INPUT_RECORDS) |
作业中所有 reducer 已经处理的输入记录的个数。每当某个 reducer 的迭代器读一个值时,该计数器的值增加。如果所有 reducer 已经处理完所有输入, 则该计数器的值与计数器 “map 输出的记录” 的值相同 |
reduce 输出的记录数(REDUCE_OUTPUT_RECORDS) |
作业中所有 map 已经产生的 reduce 输出记录数。每当某一个 reducer 的 Context 调用 write() 方法时,该计数器的值增加。 |
reduce 跳过的组数(REDUCE_SKIPPED_GROUPS) |
作业中所有 reducer 已经跳过的不同的码分组的个数。 |
reduce 跳过的记录数(REDUCE_SKIPPED_RECORDS) |
作业中所有 reducer 已经跳过输入记录数。 |
reduce 经过 shuffle 的字节数(REDUCE_SHUFFLE_BYTES) |
shuffle 将 map 的输出数据复制到 reducer 中的字节数。 |
溢出的记录数(SPILLED_RECORDS) |
作业中所有 map和reduce 任务溢出到磁盘的记录数 |
CPU 毫秒(CPU_MILLISECONDS) |
总计的 CPU 时间,以毫秒为单位,由/proc/cpuinfo获取 |
物理内存字节数(PHYSICAL_MEMORY_BYTES) |
一个任务所用物理内存的字节数,由/proc/cpuinfo获取 |
虚拟内存字节数(VIRTUAL_MEMORY_BYTES) |
一个任务所用虚拟内存的字节数,由/proc/cpuinfo获取 |
有效的堆字节数(COMMITTED_HEAP_BYTES) |
在 JVM 中的总有效内存量(以字节为单位),可由Runtime().getRuntime().totaoMemory()获取。 |
GC 运行时间毫秒数(GC_TIME_MILLIS) |
在任务执行过程中,垃圾收集器(garbage collection)花费的时间(以毫秒为单位), 可由 GarbageCollector MXBean.getCollectionTime()获取;该计数器并未出现在1.x版本中。 |
由 shuffle 传输的 map 输出数(SHUFFLED_MAPS) |
有 shuffle 传输到 reducer 的 map 输出文件数。 |
失败的 shuffle 数(SHUFFLE_MAPS) |
在 shuffle 过程中,发生拷贝错误的 map 输出文件数,该计数器并没有包含在 1.x 版本中。 |
被合并的 map 输出数 |
在 shuffle 过程中,在 reduce 端被合并的 map 输出文件数,该计数器没有包含在 1.x 版本中。 |
2)文件系统计数器
文件系统计数器的 groupName为org.apache.hadoop.mapreduce.FileSystemCounter,它包含的计数器如下表所示
计数器名称 |
说明 |
文件系统的读字节数(BYTES_READ) |
由 map 和 reduce 等任务在各个文件系统中读取的字节数,各个文件系统分别对应一个计数器,可以是 Local、HDFS、S3和KFS等。 |
文件系统的写字节数(BYTES_WRITTEN) |
由 map 和 reduce 等任务在各个文件系统中写的字节数。 |
3)FileInputFormat 计数器
FileInputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter,它包含的计数器如下表所示,计数器名称列的括号()内容即为counterName
计数器名称 |
说明 |
读取的字节数(BYTES_READ) |
由 map 任务通过 FileInputFormat 读取的字节数。 |
4)FileOutputFormat 计数器
FileOutputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter,它包含的计数器如下表所示
计数器名称 |
说明 |
写的字节数(BYTES_WRITTEN) |
由 map 任务(针对仅含 map 的作业)或者 reduce 任务通过 FileOutputFormat 写的字节数。 |
作业计数器由 JobTracker(或者 YARN)维护,因此无需在网络间传输数据,这一点与包括 “用户定义的计数器” 在内的其它计数器不同。这些计数器都是作业级别的统计量,其值不会随着任务运行而改变。 作业计数器计数器的 groupName为org.apache.hadoop.mapreduce.JobCounter,它包含的计数器如下表所示
计数器名称 |
说明 |
启用的map任务数(TOTAL_LAUNCHED_MAPS) |
启动的map任务数,包括以“推测执行” 方式启动的任务。 |
启用的 reduce 任务数(TOTAL_LAUNCHED_REDUCES) |
启动的reduce任务数,包括以“推测执行” 方式启动的任务。 |
失败的map任务数(NUM_FAILED_MAPS) |
失败的map任务数。 |
失败的 reduce 任务数(NUM_FAILED_REDUCES) |
失败的reduce任务数。 |
数据本地化的 map 任务数(DATA_LOCAL_MAPS) |
与输入数据在同一节点的 map 任务数。 |
机架本地化的 map 任务数(RACK_LOCAL_MAPS) |
与输入数据在同一机架范围内、但不在同一节点上的 map 任务数。 |
其它本地化的 map 任务数(OTHER_LOCAL_MAPS) |
与输入数据不在同一机架范围内的 map 任务数。由于机架之间的宽带资源相对较少,Hadoop 会尽量让 map 任务靠近输入数据执行,因此该计数器值一般比较小。 |
map 任务的总运行时间(SLOTS_MILLIS_MAPS) |
map 任务的总运行时间,单位毫秒。该计数器包括以推测执行方式启动的任务。 |
reduce 任务的总运行时间(SLOTS_MILLIS_REDUCES) |
reduce任务的总运行时间,单位毫秒。该值包括以推测执行方式启动的任务。 |
在保留槽之后,map任务等待的总时间(FALLOW_SLOTS_MILLIS_MAPS) |
在为 map 任务保留槽之后所花费的总等待时间,单位是毫秒。 |
在保留槽之后,reduce 任务等待的总时间(FALLOW_SLOTS_MILLIS_REDUCES) |
在为 reduce 任务保留槽之后,花在等待上的总时间,单位为毫秒。 |
下面我们来介绍如何使用计数器。
1)枚举声明计数器
// 自定义枚举变量Enum
Counter counter = context.getCounter(Enum enum)
2)自定义计数器
/ 自己命名groupName和counterName
Counter counter = context.getCounter(String groupName,String counterName)
1)初始化计数器
counter.setValue(long value);// 设置初始值
2)计数器自增
counter.increment(long incr);// 增加计数
1) 获取枚举计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters=job.getCounters();
Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG
long value=counter.getValue();//获取计数值
2) 获取自定义计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters = job.getCounters();
Counter counter=counters.findCounter("ErrorCounter","toolong");// 假如groupName为ErrorCounter,counterName为toolong
long value = counter.getValue();// 获取计数值
3) 获取内置计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters=job.getCounters();
// 查找作业运行启动的reduce个数的计数器,groupName和counterName可以从内置计数器表格查询(前面已经列举有)
Counter counter=counters.findCounter("org.apache.hadoop.mapreduce.JobCounter","TOTAL_LAUNCHED_REDUCES");// 假如groupName为org.apache.hadoop.mapreduce.JobCounter,counterName为TOTAL_LAUNCHED_REDUCES
long value=counter.getValue();// 获取计数值
4) 获取所有计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
Counters counters = job.getCounters();
for (CounterGroup group : counters) {
for (Counter counter : group) {
System.out.println(counter.getDisplayName() + ": " + counter.getName() + ": "+ counter.getValue());
}
}
过滤输入的log日志中是否包含itstar
(1)包含xyg的网站输出到XXX
(2)不包含xyg的网站输出到XXXX
输出预期:
(1)自定义一个outputformat
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
// 创建一个RecordWriter return new FilterRecordWriter(job); } } |
(2)具体的写数据RecordWriter
import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream itstarOut = null; FSDataOutputStream otherOut = null;
public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs;
try { fs = FileSystem.get(job.getConfiguration());
// 2 创建输出文件路径 Path itstarPath = new Path("F:\\date\\A\\itstarlog\\b\\itstar.log"); Path otherPath = new Path("F:\\date\\A\\itstarlog\\b\\other.log");
// 3 创建输出流 itstarOut = fs.create(itstarPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } |
@Override public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判断是否包含“xyg”输出到不同文件 if (key.toString().contains("itstar")) { itstarOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } }
@Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 if (itstarOut != null) { itstarOut.close(); }
if (otherOut != null) { otherOut.close(); } } }
|
(3)编写FilterMapper
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString();
k.set(line);
// 3 写出 context.write(k, NullWritable.get()); } } |
(4)编写FilterReducer
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String k = key.toString(); k = k + "\r\n";
context.write(new Text(k), NullWritable.get()); } } |
(6)编写FilterDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver { public static void main(String[] args) throws Exception {
args = new String[] { "F:\date\A\itstarlog\A", "F:\date\A\itstarlog\b" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
// 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } |
1)需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
2)输入数据
最终预期文件格式:
3)分析
小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
(2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
(3)在mapreduce处理时,可采用CombineTextInputFormat提高效率
4)具体实现
本节采用自定义InputFormat的方式,处理输入小文件的问题。
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
(3)在输出时使用SequenceFileOutPutFormat输出合并文件
5)程序实现:
(1)自定义InputFromat
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
// 定义类继承FileInputFormat public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{
@Override protected boolean isSplitable(JobContext context, Path filename) { return false; }
@Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context);
return recordReader; } } |
(2)自定义RecordReader
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable>{
private Configuration configuration; private FileSplit split;
private boolean processed = false; private BytesWritable value = new BytesWritable();
@Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit)split; configuration = context.getConfiguration(); }
@Override public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) { // 1 定义缓存区 byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null; FSDataInputStream fis = null;
try { // 2 获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration);
// 3 读取数据 fis = fs.open(path);
// 4 读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length);
// 5 输出文件内容 value.set(contents, 0, contents.length); } catch (Exception e) {
}finally { IOUtils.closeStream(fis); }
processed = true;
return true; }
return false; }
@Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); }
@Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; }
@Override public float getProgress() throws IOException, InterruptedException { return processed? 1:0; }
@Override public void close() throws IOException { } } |
(3)SequenceFileMapper处理流程
import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
Text k = new Text();
@Override protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { // 1 获取文件切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 2 获取切片名称 String name = inputSplit.getPath().toString(); // 3 设置key的输出 k.set(name); }
@Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(k, value); } } |
(4)SequenceFileReducer处理流程
import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next()); } } |
(5)SequenceFileDriver处理流程
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[] { "e:/input/inputinputformat", "e:/output1" }; Configuration conf = new Configuration();
Job job = Job.getInstance(conf); job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class);
// 设置输入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 设置输出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true); } } |
订单数据表t_order:
id |
pid |
amount |
1001 |
01 |
1 |
1002 |
02 |
2 |
1003 |
03 |
3 |
订单数据order.txt
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
商品信息表t_product
pid |
pname |
01 |
小米 |
02 |
华为 |
03 |
格力 |
商品数据pd.txt
01 小米
02 华为
03 格力
将商品信息表中数据根据商品pid合并到订单数据表中。
最终数据形式:
id |
pname |
amount |
1001 |
小米 |
1 |
1004 |
小米 |
4 |
1002 |
华为 |
2 |
1005 |
华为 |
5 |
1003 |
格力 |
3 |
1006 |
格力 |
6 |
通过将关联条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。
1)创建商品和订合并后的bean类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class TableBean implements Writable {
private String order_id; // 订单id
private String p_id; // 产品id
private int amount; // 产品数量
private String pname; // 产品名称
private String flag;// 表的标记
public TableBean() {
super();
}
public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
super();
this.order_id = order_id;
this.p_id = p_id;
this.amount = amount;
this.pname = pname;
this.flag = flag;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.p_id = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
@Override
public String toString() {
return order_id + "\t" + pname + "\t" + amount + "\t" ;
}
}
2)编写TableMapper程序
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
TableBean bean = new TableBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取输入文件类型
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();
// 2 获取输入数据
String line = value.toString();
// 3 不同文件分别处理
if (name.startsWith("order")) {// 订单表处理
// 3.1 切割
String[] fields = line.split(",");
// 3.2 封装bean对象
bean.setOrder_id(fields[0]);
bean.setP_id(fields[1]);
bean.setAmount(Integer.parseInt(fields[2]));
bean.setPname("");
bean.setFlag("0");
k.set(fields[1]);
}else {// 产品表处理
// 3.3 切割
String[] fields = line.split(",");
// 3.4 封装bean对象
bean.setP_id(fields[0]);
bean.setPname(fields[1]);
bean.setFlag("1");
bean.setAmount(0);
bean.setOrder_id("");
k.set(fields[0]);
}
// 4 写出
context.write(k, bean);
}
}
3)编写TableReducer程序
package com.xyg.mapreduce.table;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 1准备存储订单的集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
// 2 准备bean对象
TableBean pdBean = new TableBean();
for (TableBean bean : values) {
if ("0".equals(bean.getFlag())) {// 订单表
// 拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();`
try {
BeanUtils.copyProperties(orderBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
orderBeans.add(orderBean);
} else {// 产品表
try {
// 拷贝传递过来的产品表到内存中
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 3 表的拼接
for(TableBean bean:orderBeans){
bean.getPname(pdBean.getPname());
// 4 数据写出去
context.write(bean, NullWritable.get());
}
}
}
4)编写TableDriver程序
package com.xyg.mapreduce.table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver {
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(TableDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
3)运行程序查看结果
1001 小米 1
1001 小米 1
1002 华为 2
1002 华为 2
1003 格力 3
1003 格力 3
缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端实现数据合并
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行合并并输出最终结果,可以大大提高合并操作的并发度,加快处理速度。
(1)先在驱动模块中添加缓存文件
package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistributedCacheDriver {
public static void main(String[] args) throws Exception {
// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);
// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);
// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 加载缓存数据
job.addCacheFile(new URI("file:///e:/inputcache/pd.txt"));
// 7 map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);
// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(2)读取缓存的文件数据
package test;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Map<String, String> pdMap = new HashMap<>();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// 1 获取缓存的文件
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"),"UTF-8"));
String line;
while(StringUtils.isNotEmpty(line = reader.readLine())){
// 2 切割
String[] fields = line.split("\t");
// 3 缓存数据到集合
pdMap.put(fields[0], fields[1]);
}
// 4 关流
reader.close();
}
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 截取
String[] fields = line.split("\t");
// 3 获取产品id
String pId = fields[1];
// 4 获取商品名称
String pdName = pdMap.get(pId);
// 5 拼接
k.set(line + "\t"+ pdName);
// 6 写出
context.write(k, NullWritable.get());
}
}
有如下订单数据
订单id |
商品id |
成交金额 |
0000001 |
Pdt_01 |
222.8 |
0000001 |
Pdt_05 |
25.8 |
0000002 |
Pdt_03 |
522.8 |
0000002 |
Pdt_04 |
122.4 |
0000002 |
Pdt_05 |
722.4 |
0000003 |
Pdt_01 |
222.8 |
0000003 |
Pdt_02 |
33.8 |
现在需要求出每一个订单中最贵的商品。
GroupingComparator.txt
Pdt_01 222.8
Pdt_05 722.4
Pdt_05 25.8
Pdt_01 222.8
Pdt_01 33.8
Pdt_03 522.8
Pdt_04 122.4
输出数据预期:
part-r-00000.txt
part-r-00001.txt
part-r-00002.txt
(1)利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce。
(2)在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值。
定义订单信息OrderBean
package com.xyg.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
private int order_id; // 订单id号
private double price; // 价格
public OrderBean() {
super();
}
public OrderBean(int order_id, double price) {
super();
this.order_id = order_id;
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
price = in.readDouble();
}
@Override
public String toString() {
return order_id + "\t" + price;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
// 二次排序
@Override
public int compareTo(OrderBean o) {
int result = order_id > o.getOrder_id() ? 1 : -1;
if (order_id > o.getOrder_id()) {
result = 1;
} else if (order_id < o.getOrder_id()) {
result = -1;
} else {
// 价格倒序排序
result = price > o.getPrice() ? -1 : 1;
}
return result;
}
}
编写OrderSortMapper处理流程
package com.xyg.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 截取
String[] fields = line.split("\t");
// 3 封装对象
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));
// 4 写出
context.write(k, NullWritable.get());
}
}
编写OrderSortReducer处理流程
package com.xyg.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
编写OrderSortDriver处理流程
package com.xyg.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OrderDriver {
public static void main(String[] args) throws Exception, IOException {
// 1 获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包加载路径
job.setJarByClass(OrderDriver.class);
// 3 加载map/reduce类
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4 设置map输出数据key和value类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出数据的key和value类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 10 设置reduce端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);
// 7 设置分区
job.setPartitionerClass(OrderPartitioner.class);
// 8 设置reduce个数
job.setNumReduceTasks(3);
// 9 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
OrderSortDriver
编写OrderSortPartitioner处理流程
package com.xyg.mapreduce.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {
return (key.getOrder_id() & Integer.MAX_VALUE) % numReduceTasks;
}
}
编写OrderSortGroupingComparator处理流程
package com.xyg.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
1)需求: 统计每一个手机号耗费的总上行流量、下行流量、总流量
2)数据准备 phone_date.txt
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
13560436666 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
输入数据格式:
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
手机号码 上行流量 下行流量
输出数据格式
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量
3)分析
基本思路:
Map阶段:
(1)读取一行数据,切分字段
(2)抽取手机号、上行流量、下行流量
(3)以手机号为key,bean对象为value输出,即context.write(手机号,bean);
Reduce阶段:
(1)累加上行流量和下行流量得到总流量。
(2)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
(3)MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key
所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。
然后重写key的compareTo方法。
4)编写mapreduce程序
(1)编写流量统计的bean对象
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
// bean对象要实例化
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
// 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
/**
* 序列化方法
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法
注意反序列化的顺序和序列化的顺序完全一致
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
(2)编写mapreduce主程序
package com.xyg.mr.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 将一行内容转成string
String ling = value.toString();
// 2 切分字段
String[] fields = ling.split("\t");
// 3 取出手机号码
String phoneNum = fields[1];
// 4 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
// 5 写出数据
context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_downFlow += bean.getDownFlow();
}
// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCount.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(3)将程序打成jar包,然后拷贝到hadoop集群中。
(4)启动hadoop集群(3)将程序打成jar包,然后拷贝到hadoop集群中。
(5)执行flowcount程序
[root@node21 ~]$ hadoop jar flowcount.jar com.xyg.mr.flowsum.FlowCount /user/root/flowcount/input/ /user/root/flowcount/output
(6)查看结果
[root@node21 ~]$ hadoop fs -cat /user/root/flowcount/output/part-r-00000
13480253104 FlowBean [upFlow=180, downFlow=180, sumFlow=360]
13502468823 FlowBean [upFlow=7335, downFlow=110349, sumFlow=117684]
13560436666 FlowBean [upFlow=1116, downFlow=954, sumFlow=2070]
13560439658 FlowBean [upFlow=2034, downFlow=5892, sumFlow=7926]
13602846565 FlowBean [upFlow=1938, downFlow=2910, sumFlow=4848]
。。。
0)需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
1)数据准备 phone_date.txt
2)分析
(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发
(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
自定义一个CustomPartitioner继承抽象类:Partitioner
(3)在job驱动中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
3)在需求1的基础上,增加一个分区类
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* K2 V2 对应的是map输出kv类型
* @author Administrator
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
2)在驱动函数中增加自定义数据分区设置和reduce task设置
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCount.class);
// 8 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
3)将程序打成jar包,然后拷贝到hadoop集群中。
4)启动hadoop集群
5)执行flowcountPartitionser程序
[root@node21 ~]$ hadoop jar flowcountPartitionser.jar com.xyg.mr.partitioner.FlowCount /user/root/flowcount/input /user/root/flowcount/output
6)查看结果
[root@node21 ~]]$ hadoop fs -lsr /
/user/root/flowcount/output/part-r-00000
/user/root/flowcount/output/part-r-00001
/user/root/flowcount/output/part-r-00002
/user/root/flowcount/output/part-r-00003
/user/root/flowcount/output/part-r-00004
0)需求 根据需求1产生的结果再次对总流量进行排序。
1)数据准备 phone_date.txt
2)分析
(1)把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序
(2)context.write(总流量,手机号)
(3)FlowBean实现WritableComparable接口重写compareTo方法
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
package com.xyg.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
// 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
/**
* 序列化方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
}
4)Map方法优化为一个对象,reduce方法则直接输出结果即可,驱动函数根据输入输出重写配置即可。3)FlowBean对象在在需求1基础上增加了比较功能
package com.xyg.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCountSort {
static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
FlowBean bean = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 拿到的是上一个统计程序输出的结果,已经是各手机号的总流量信息
String line = value.toString();
// 2 截取字符串并获取电话号、上行流量、下行流量
String[] fields = line.split("\t");
String phoneNbr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
// 3 封装对象
bean.set(upFlow, downFlow);
v.set(phoneNbr);
// 4 输出
context.write(bean, v);
}
}
static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean bean, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.write(values.iterator().next(), bean);
}
}
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSort.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);
// FileSystem fs = FileSystem.get(configuration);
// if (fs.exists(outPath)) {
// fs.delete(outPath, true);
// }
FileOutputFormat.setOutputPath(job, outPath);
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5)将程序打成jar包,然后拷贝到hadoop集群中。
6)启动hadoop集群5)将程序打成jar包,然后拷贝到hadoop集群中。
7)执行flowcountsort程序
[root@node21 module]$ hadoop jar flowcountsort.jar com.xyg.mr.sort.FlowCountSort /user/root/flowcount/output /user/root/flowcount/output_sort
8)查看结果
[root@node21 module]$ hadoop fs -cat /user/flowcount/output_sort/part-r-00000
13502468823 7335 110349 117684
13925057413 11058 48243 59301
13726238888 2481 24681 27162
13726230503 2481 24681 27162
18320173382 9531 2412 11943
1)需求 要求每个省份手机号输出的文件中按照总流量内部排序。
2)分析 基于需求3,增加自定义分区类即可。
3)案例实操
(1)增加自定义分区类
package com.xyg.reduce.flowsort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowSortPartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
int partition = 0;
String preNum = value.toString().substring(0, 3);
if (" ".equals(preNum)) {
partition = 5;
} else {
if ("136".equals(preNum)) {
partition = 1;
} else if ("137".equals(preNum)) {
partition = 2;
} else if ("138".equals(preNum)) {
partition = 3;
} else if ("139".equals(preNum)) {
partition = 4;
}
}
return partition;
}
}
(2)在驱动类中添加分区类
job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);
0)需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
1)数据准备:Hello.txt
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
2)分析
按照mapreduce编程规范,分别编写Mapper,Reducer,Driver。
3)编写程序
(1)定义一个mapper类
package com.xyg.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long;
* 在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而是用LongWritable
* VALUEIN:默认情况下,是mr框架所读到的一行文本内容,String;此处用Text
* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String;此处用Text
* VALUEOUT,是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable
* @author Administrator
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* map阶段的业务逻辑就写在自定义的map()方法中
* maptask会对每一行输入数据调用一次我们自定义的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 将maptask传给我们的文本内容先转换成String
String line = value.toString();
// 2 根据空格将这一行切分成单词
String[] words = line.split(" ");
// 3 将单词输出为<单词,1>
for(String word:words){
// 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reducetask中
context.write(new Text(word), new IntWritable(1));
}
}
}
(2)定义一个reducer类
package com.xyg.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型
* KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* key,是一组相同单词kv对的key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
// 1 汇总各个key的个数
for(IntWritable value:values){
count +=value.get();
}
// 2输出该key的总次数
context.write(key, new IntWritable(count));
}
}
(3)定义一个主类,用来描述job并提交job
package com.xyg.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于一个yarn集群的客户端,
* 需要在此封装我们的mr程序相关运行参数,指定jar包
* 最后提交给yarn
* @author Administrator
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
// 8 配置提交到yarn上运行,windows和Linux变量不一致
// configuration.set("mapreduce.framework.name", "yarn");
// configuration.set("yarn.resourcemanager.hostname", "node22");
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路径
// job.setJar("/home/admin/wc.jar");
job.setJarByClass(WordcountDriver.class);
// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
// job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
4)集群上测试
(1)将程序打成jar包,然后拷贝到hadoop集群中。
(2)启动hadoop集群
(3)执行wordcount程序
[admin@node21 module]$ hadoop jar wc.jar com.xyg.wordcount.WordcountDriver /user/admin/input /user/admin/output
5)本地测试
(1)在windows环境上配置HADOOP_HOME环境变量。
(2)在eclipse上运行程序
(3)注意:如果eclipse打印不出日志,在控制台上只显示
1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
2.log4j:WARN Please initialize the log4j system properly.
3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
需要在项目的src目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
0)分析
1)自定义分区
package com.xyg.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// 1 获取单词key
String firWord = key.toString().substring(0, 1);
char[] charArray = firWord.toCharArray();
int result = charArray[0];
// int result = key.toString().charAt(0);
// 2 根据奇数偶数分区
if (result % 2 == 0) {
return 0;
}else {
return 1;
}
}
}
2)在驱动中配置加载分区,设置reducetask个数
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
0)需求:统计过程中对每一个maptask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
1)数据准备:hello,txt
方案一
1)增加一个WordcountCombiner类继承Reducer
package com.xyg.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v :values){
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
2)在WordcountDriver驱动类中指定combiner
//9 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);
方案二
1)将WordcountReducer作为combiner在WordcountDriver驱动类中指定
//9 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountReducer.class);
运行程序
0)需求:将输入的大量小文件合并成一个切片统一处理。
1)输入数据:准备5个小文件
2)实现过程
(1)不做任何处理,运行需求1中的wordcount程序,观察切片个数为5
(2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
实现两表join
Bean.java
package Join;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/* * 人员和地址的通用bean */ public class Bean implements WritableComparable<Bean> { private String userNo = ""; private String userName = ""; private String addreNo = ""; private String addreName = ""; private int flag;
public Bean(Bean bean) { this.userName = bean.getUserName(); this.userNo = bean.getUserNo(); this.addreName = bean.getAddreName(); this.addreNo = bean.getAddreNo(); this.flag = bean.getFlag(); }
public Bean() { super(); // TODO Auto-generated constructor stub }
public Bean(String userNo, String userName, String addreNo, String addreName, int flag) { super(); this.userNo = userNo; this.userName = userName; this.addreNo = addreNo; this.addreName = addreName; this.flag = flag; }
public String getUserNo() { return userNo; }
public void setUserNo(String userNo) { this.userNo = userNo; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public String getAddreNo() { return addreNo; }
public void setAddreNo(String addreNo) { this.addreNo = addreNo; }
public String getAddreName() { return addreName; }
public void setAddreName(String addreName) { this.addreName = addreName; }
public int getFlag() { return flag; }
public void setFlag(int flag) { this.flag = flag; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(userNo); out.writeUTF(userName); out.writeUTF(addreNo); out.writeUTF(addreName); out.writeInt(flag);
}
@Override public void readFields(DataInput in) throws IOException { this.userNo = in.readUTF(); this.userName = in.readUTF(); this.addreNo = in.readUTF(); this.addreName = in.readUTF(); this.flag = in.readInt();
}
@Override public int compareTo(Bean arg0) { // TODO Auto-generated method stub return 0; }
@Override public String toString() { return "userNo=" + userNo + ", userName=" + userName + ", addreNo=" + addreNo + ", addreName=" + addreName; }
} |
PersonAddrMap.java
package Join;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PersonAddrMap extends Mapper<LongWritable, Text, IntWritable, Bean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Bean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String str[] = line.split(" "); if (str.length == 2) { //地区信息表 Bean bean = new Bean(); bean.setAddreNo(str[0]); bean.setAddreName(str[1]); bean.setFlag(0); // 0表示地区 context.write(new IntWritable(Integer.parseInt(str[0])), bean); } else { //人员信息表 Bean bean = new Bean(); bean.setUserNo(str[0]); bean.setUserName(str[1]); bean.setAddreNo(str[2]); bean.setFlag(1); // 1表示人员表 context.write(new IntWritable(Integer.parseInt(str[2])), bean); } } }
|
PersonAddreRedu.java
package Join;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; import java.util.ArrayList; import java.util.List;
public class PersonAddreRedu extends Reducer<IntWritable, Bean, NullWritable,Text> { @Override protected void reduce(IntWritable key, Iterable<Bean> values, Reducer<IntWritable, Bean, NullWritable, Text>.Context context) throws IOException, InterruptedException { Bean Addre = null; List<Bean> peoples = new ArrayList<Bean>(); /* * 如果values的第一个元素信息就是地址Addre的信息的话, * 我们就不再需要一个List来缓存person信息了,values后面的全是人员信息 * 将减少巨大的内存空间 */ /* * partitioner和shuffer的过程: * partitioner的主要功能是根据reduce的数量将map输出的结果进行分块,将数据送入到相应的reducer. * 所有的partitioner都必须实现partitioner接口并实现getPartition方法,该方法的返回值为int类型,并且取值范围在0~(numOfReducer-1), * 从而能将map的输出输入到对应的reducer中,对于某个mapreduce过程,hadoop框架定义了默认的partitioner为HashPartioner, * 该partitioner使用key的hashCode来决定将该key输送到哪个reducer; * shuffle将每个partitioner输出的结果根据key进行group以及排序,将具有相同key的value构成一个values的迭代器,并根据key进行排序分别调用 * 开发者定义的reduce方法进行排序,因此mapreducer的所以key必须实现comparable接口的compareto()方法从而能实现两个key对象的比较 */ /* * 我们需要自定义key的数据结构(shuffle按照key进行分组)来满足共同addreNo的情况下地址表的更小需求 * */ for (Bean bean : values) { if (bean.getFlag() == 0) { // 表示地区表 Addre = new Bean(bean); } else { peoples.add(new Bean(bean)); //添加到peoplelist中 } } for (Bean peo : peoples) { // 给peoplelist添加地区名字 peo.setAddreName(Addre.getAddreName()); context.write(NullWritable.get(), new Text(peo.toString())); } } } |
PersonAddreMain.java
package Join;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PersonAddreMain { public static void main(String[] args) throws Exception {
args = new String[] { "F:\\A\\join\\", "F:\\A\\out" };
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PersonAddreMain.class);
job.setMapperClass(PersonAddrMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Bean.class);
job.setReducerClass(PersonAddreRedu.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } } |
Bean.java
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/* * 人员和地址的通用bean * 用作map输出的value */ public class Bean implements WritableComparable<Bean> { private String userNo = " "; private String userName = " "; private String addreNo = " "; private String addreName = " ";
public Bean(Bean bean) { this.userName = bean.getUserName(); this.userNo = bean.getUserNo(); this.addreName = bean.getAddreName(); this.addreNo = bean.getAddreNo(); }
public Bean() { super(); // TODO Auto-generated constructor stub }
public Bean(String userNo, String userName, String addreNo, String addreName, int flag) { super(); this.userNo = userNo; this.userName = userName; this.addreNo = addreNo; this.addreName = addreName; }
public String getUserNo() { return userNo; }
public void setUserNo(String userNo) { this.userNo = userNo; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public String getAddreNo() { return addreNo; }
public void setAddreNo(String addreNo) { this.addreNo = addreNo; }
public String getAddreName() { return addreName; }
public void setAddreName(String addreName) { this.addreName = addreName; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(userNo); out.writeUTF(userName); out.writeUTF(addreNo); out.writeUTF(addreName);
}
@Override public void readFields(DataInput in) throws IOException { this.userNo = in.readUTF(); this.userName = in.readUTF(); this.addreNo = in.readUTF(); this.addreName = in.readUTF(); }
@Override public int compareTo(Bean arg0) { // TODO Auto-generated method stub return 0; }
@Override public String toString() { return "userNo=" + userNo + ", userName=" + userName + ", addreNo=" + addreNo + ", addreName=" + addreName; } } |
BeanKey.java
package Join2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/* * map输出的key */ public class BeanKey implements WritableComparable<BeanKey> { private int AddreNo; private boolean isPrimary; // true:address false:person
public BeanKey(int addreNo, boolean isPrimary) { super(); this.AddreNo = addreNo; this.isPrimary = isPrimary; }
public BeanKey() { super(); // TODO Auto-generated constructor stub }
@Override public void write(DataOutput out) throws IOException { out.writeInt(AddreNo); out.writeBoolean(isPrimary);
}
@Override public void readFields(DataInput in) throws IOException { this.AddreNo = in.readInt(); this.isPrimary = in.readBoolean();
}
// partitioner执行时调用hashcode()方法和compareTo()方法 // compareTo()方法作为shuffle排序的默认方法 @Override public int hashCode() { return this.AddreNo; // 按AddreNo进行分组 }
//用于排序,将相同的AddressNo的地址表和人员表,将地址表放到首位 @Override public int compareTo(BeanKey o) { if (this.AddreNo == o.getAddreNo()) { // 如果是同一个AddressNo的数据则判断是Person还是Address表 if (this.isPrimary == o.isPrimary()) { //如果属性相同属于同种类型的表,返回0 return 0; } else { return this.isPrimary ? -1 : 1; // true表示Address表 返回更小的值,将排至values队首 } } else { return this.AddreNo - o.getAddreNo() > 0 ? 1 : -1; //按AddressNo排序 } }
public int getAddreNo() { return AddreNo; }
public void setAddreNo(int addreNo) { AddreNo = addreNo; }
public boolean isPrimary() { return isPrimary; }
public void setPrimary(boolean isPrimary) { this.isPrimary = isPrimary; } } |
PersonAddreMap.java
package Join2;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException;
/* * map类使key,value分别进行处理 */ public class PersonAddreMap extends Mapper<LongWritable, Text, BeanKey, Bean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, BeanKey, Bean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String str[] = line.split(" "); if (str.length == 2) { // Addre表 Bean Addre = new Bean(); Addre.setAddreNo(str[0]); Addre.setAddreName(str[1]);
BeanKey AddreKey = new BeanKey(); AddreKey.setAddreNo(Integer.parseInt(str[0])); AddreKey.setPrimary(true); // true表示地区表 context.write(AddreKey, Addre); } else { // Person表 Bean Person = new Bean(); Person.setUserNo(str[0]); Person.setUserName(str[1]); Person.setAddreNo(str[2]);
BeanKey PerKey = new BeanKey(); PerKey.setAddreNo(Integer.parseInt(str[2])); PerKey.setPrimary(false);// false表示人员表 context.write(PerKey, Person);
} }
} |
PersonAddreRedu.java
package Join2;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PersonAddreRedu extends Reducer<BeanKey, Bean, NullWritable, Text> { @Override protected void reduce(BeanKey key, Iterable<Bean> values, Reducer<BeanKey, Bean, NullWritable, Text>.Context context) throws IOException, InterruptedException { Bean Addre = null; int num = 0; for (Bean bean : values) { if (num == 0) { Addre = new Bean(bean); // Address地址表为values的第一个值 num++; } else { // 其余全为person表 // 没有list数组,节省大量内存空间 bean.setAddreName(Addre.getAddreName()); context.write(NullWritable.get(), new Text(bean.toString())); } } } } |
PKFKCompartor.java
package Join2;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;
/* * 实现Group分组 * shuffle的group过程默认的是使用的key(BeanKey)的compareTo()方法 * 刚才我们添加的自定义的Key没有办法将具有相同AddressNo的地址和人员放到同一个group中(因为从compareTo()方法中可以看出他们是不相等的) * 我们需要的就是自己定义一个groupComparer就可以 * 实现比较器 */ public class PKFKCompartor extends WritableComparator {
protected PKFKCompartor() { super(BeanKey.class, true); }
//两个BeanKey进行比较排序 @Override public int compare(WritableComparable a, WritableComparable b) { BeanKey a1 = (BeanKey) a; BeanKey b1 = (BeanKey) b; if (a1.getAddreNo() == b1.getAddreNo()) { return 0; } else { return a1.getAddreNo() > b1.getAddreNo() ? 1 : -1; } } } |
PersonAddreMain.java
package Join2;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PersonAddreMain { public static void main(String[] args) throws Exception {
args = new String[]{"F:\\A\\join\\", "F:\\A\\out_Andy1"};
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PersonAddreMain.class);
//设置自定义的group job.setGroupingComparatorClass(PKFKCompartor.class);
job.setMapperClass(PersonAddreMap.class); job.setMapOutputKeyClass(BeanKey.class); job.setMapOutputValueClass(Bean.class);
job.setReducerClass(PersonAddreRedu.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } } |
原理:读取字段、reduce不做处理、自动去重 |
文本数据: AAA BBB AAA CCC |
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
context.write(new Text(split[0]),NullWritable.get()); } }
|
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DistinctReduce extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } |
Driver不写
原文:https://www.cnblogs.com/jareny/p/11247926.html