上一个博文中,我们分别介绍了使用combiner 和in-map 聚合 来优化 map reduce job。
这一篇中,继续优化这个统计字符数量的mr 程序。
再次介绍下map reduce程序的目标: 统计输入文件中 a~z 字符分别有多少个。 A 与 a 都统计到 a 中。
先看一下上次运行的Job Counters 计数,可以看到Reducer Task的个数为1.
-
Job Counters
-
Launched map tasks=3
-
Launched reduce tasks=1
-
Data-local map tasks=3
-
Total time spent by all maps in occupied slots (ms)=1207744
-
Total time spent by all reduces in occupied slots (ms)=341424
-
知识点:MR Job 默认的reducer 个数为1. reducer的个数可以通过job.setNumReducerTasks(num) 来进行设置。
留个问题:那map 任务的个数怎么设置?
假如数据量很大时, 一个reducer task 可能成为任务的bottle neck。 那我们手工设置一下reducer 个数。
-
@Override
-
public int run(String[] args) throws Exception {
-
//valid the parameters
-
if(args.length !=2){
-
return -1;
-
}
-
-
Job job = Job.getInstance(getConf(), "MyWordCountJob");
-
job.setJarByClass(MyWordCountJob.class);
-
-
Path inPath = new Path(args[0]);
-
Path outPath = new Path(args[1]);
-
-
outPath.getFileSystem(getConf()).delete(outPath,true);
-
TextInputFormat.setInputPaths(job, inPath);
-
TextOutputFormat.setOutputPath(job, outPath);
-
-
-
job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
-
job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(IntWritable.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
job.setNumReduceTasks(3);
-
return job.waitForCompletion(true)?0:1;
-
}
编译后再次运行查看日志 可以看到 reducer的个数是3,符合我们的预期。但问题还没有完!
-
File System Counters
-
FILE: Number of bytes read=642
-
FILE: Number of bytes written=507033
-
FILE: Number of read operations=0
-
FILE: Number of large read operations=0
-
FILE: Number of write operations=0
-
HDFS: Number of bytes read=556
-
HDFS: Number of bytes written=107
-
HDFS: Number of read operations=18
-
HDFS: Number of large read operations=0
-
HDFS: Number of write operations=6
-
Job Counters
-
Launched map tasks=3
-
Launched reduce tasks=3
-
Data-local map tasks=3
-
Total time spent by all maps in occupied slots (ms)=1207744
-
Total time spent by all reduces in occupied slots (ms)=341424
-
Map-Reduce Framework
-
Map input records=8
-
Map output records=78
-
Map output bytes=468
-
Map output materialized bytes=678
-
Input split bytes=399
-
Combine input records=0
-
Combine output records=0
-
Reduce input groups=26
-
Reduce shuffle bytes=678
-
Reduce input records=78
-
Reduce output records=26
-
Spilled Records=156
-
Shuffled Maps =9
-
Failed Shuffles=0
-
Merged Map outputs=9
-
GC time elapsed (ms)=507
-
CPU time spent (ms)=7770
-
Physical memory (bytes) snapshot=1329672192
-
Virtual memory (bytes) snapshot=5978918912
-
Total committed heap usage (bytes)=1004273664
下面看一下输出,有三个输出文件,正常,一个reducer 对应一个输出文件。
-
[train@sandbox MyWordCount]$ hdfs dfs -ls output
-
Found 4 items
-
-rw-r--r-- 3 train hdfs 0 2016-05-11 11:48 output/_SUCCESS
-
-rw-r--r-- 3 train hdfs 37 2016-05-11 11:48 output/part-r-00000
-
-rw-r--r-- 3 train hdfs 34 2016-05-11 11:48 output/part-r-00001
-
-rw-r--r-- 3 train hdfs 36 2016-05-11 11:48 output/part-r-00002
我们看一下结果文件中的内容
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
-
b 3
-
e 11
-
h 8
-
k 3
-
n 4
-
q 3
-
t 4
-
w 7
-
z 3
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
-
c 4
-
f 4
-
i 5
-
l 6
-
o 12
-
r 13
-
u 6
-
x 3
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
-
a 8
-
d 4
-
g 3
-
j 3
-
m 7
-
p 3
-
s 4
-
v 0
-
y 6
发现三个文件中没有按顺序输出,现在我们想要的是a~h 放到一个文件中,i~q 放到第二个文件中、r~z 放到第三个文件中。我们应该怎么办?
这就要用到我们本次将要介绍的利器: Partitioner 。
Partition 就是用来决定map的输出会输送到哪儿个reducer中。当只有一个reducer 时,不会调用Partitioner,即使配置了也不会调用。
hadoop 框架中默认的Partitioner 是HashPartitioner。 它是根据key的hash值对reducer个数取余进行分配的。
说明一下:返回0即本记录将发往第一个reducer,返回1则本记录发往第二个reducer.依次类推。
-
public class HashPartitioner<K, V> extends Partitioner<K, V> {
-
-
public int getPartition(K key, V value,
-
int numReduceTasks) {
-
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-
}
-
}
那接下来我们要定义自己的一个Partitioner ,它要按照我们预定的a~h 发往第一个reducer,i~q 发往第二个reducer、r~z发往第三个reducer。
-
public static class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{
-
-
@Override
-
public int getPartition(Text key, IntWritable value, int numPartitions) {
-
char c =key.toString().charAt(0);
-
if(c>=‘a‘&& c<‘i‘)
-
return 0;
-
else if(c>=‘i‘ && c<‘q‘)
-
return 1;
-
else
-
return 2;
-
}
-
-
}
运行查看结果,可见实现了我们的预期。
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00000
-
a 8
-
b 3
-
c 4
-
d 4
-
e 11
-
f 4
-
g 3
-
h 8
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00001
-
i 5
-
j 3
-
k 3
-
l 6
-
m 7
-
n 4
-
o 12
-
p 3
-
[train@sandbox MyWordCount]$ hdfs dfs -cat output/part-r-00002
-
q 3
-
r 13
-
s 4
-
t 4
-
u 6
-
v 0
-
w 7
-
x 3
-
y 6
-
z 3
下面照例把整个代码贴上
-
package wordcount;
-
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.conf.Configured;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Partitioner;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.apache.hadoop.util.Tool;
-
import org.apache.hadoop.util.ToolRunner;
-
import org.apache.log4j.Logger;
-
-
public class MyWordCountJob extends Configured implements Tool {
-
Logger log = Logger.getLogger(MyWordCountJob.class);
-
-
public static class MyWordCountMapper extends
-
Mapper<LongWritable, Text, Text, IntWritable> {
-
Logger log = Logger.getLogger(MyWordCountJob.class);
-
Map<Character,Integer> map = new HashMap<Character,Integer>();
-
-
Text mapKey = new Text();
-
IntWritable mapValue = new IntWritable(1);
-
@Override
-
protected void map(LongWritable key, Text value, Context context)
-
throws IOException, InterruptedException {
-
for(char c :value.toString().toLowerCase().toCharArray()){
-
if(c>=‘a‘ && c <=‘z‘){
-
map.put(c,map.get(c)+1);
-
}
-
}
-
}
-
-
@Override
-
protected void cleanup(Context context) throws IOException,
-
InterruptedException {
-
for(char key : map.keySet()){
-
mapKey.set(String.valueOf(key));
-
mapValue.set(map.get(key));
-
context.write(mapKey, mapValue);
-
}
-
-
}
-
-
@Override
-
protected void setup(Context context) throws IOException,
-
InterruptedException {
-
for(char c=‘a‘;c<=‘z‘ ;c++){
-
map.put(c, 0);
-
}
-
}
-
-
}
-
-
public class MyWordCountPartitioner extends Partitioner<Text,IntWritable>{
-
-
@Override
-
public int getPartition(Text key, IntWritable value, int numPartitions) {
-
char c =key.toString().charAt(0);
-
if(c>=‘a‘&& c<‘i‘)
-
return 0;
-
else if(c>=‘i‘ && c<‘q‘)
-
return 1;
-
else
-
return 2;
-
}
-
-
}
-
-
public static class MyWordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
-
Text rkey = new Text();
-
IntWritable rvalue = new IntWritable(1);
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
-
throws IOException, InterruptedException {
-
int n=0;
-
for(IntWritable value :values){
-
n+= value.get();
-
}
-
rvalue.set(n);
-
context.write(key, rvalue);
-
}
-
}
-
-
@Override
-
public int run(String[] args) throws Exception {
-
//valid the parameters
-
if(args.length !=2){
-
return -1;
-
}
-
-
Job job = Job.getInstance(getConf(), "MyWordCountJob");
-
job.setJarByClass(MyWordCountJob.class);
-
-
Path inPath = new Path(args[0]);
-
Path outPath = new Path(args[1]);
-
-
outPath.getFileSystem(getConf()).delete(outPath,true);
-
TextInputFormat.setInputPaths(job, inPath);
-
TextOutputFormat.setOutputPath(job, outPath);
-
-
-
job.setMapperClass(MyWordCountJob.MyWordCountMapper.class);
-
job.setReducerClass(MyWordCountJob.MyWordCountReducer.class);
-
job.setPartitionerClass(MyWordCountPartitioner.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(IntWritable.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
job.setNumReduceTasks(3);
-
return job.waitForCompletion(true)?0:1;
-
}
-
public static void main(String [] args){
-
int result = 0;
-
try {
-
result = ToolRunner.run(new Configuration(), new MyWordCountJob(), args);
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
System.exit(result);
-
}
-
-
}
最后再唠叨两句: 一般情况要,自定义partitioner 是为了解决数据分布不均的情况,又叫数据倾斜。 而且自定义的partitioner 要保证,相同的key要发往相同的reducer。
一个MapReduce 程序示例 细节决定成败(五) :Partitioner
原文:http://blog.itpub.net/30066956/viewspace-2108214/