以单词统计为案例。
假如现在文件中存在如下内容:
aa bb
aa cc
dd aa
当然,这是小文件,如果文件大小较大时会将文件进行 “切片” ,此处的切片和 HDFS 的 “分块”概念不同。
“切片” 是将文件进行逻辑的划分,而 “分块” 是进行物理的划分。
即 “切片” 是将文件按照某一大小进行标记(默认为128m,即与分块大小相同),如文件为300M,那么将会标记为 0~128M 为一片,128M~256M 为一片,256M~300M 为一片。
1、首先执行map阶段,会逐行读取数据,然后根据 空格 将每行的单词分隔出来,然后将其组成键值对,但是此时即便会有多个相同的单词,map不会将其合并,即会有多个<a,1>的存在(便于理解,以实际存储为准)。
2、接着会执行reducer阶段,将map执行后的数据进行汇总。
只是简单的一些理解记录。
根据反编译官方的wordcount案例后我们可以得知,此方法需要三个类:
package com.neve.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
private Text outk = new Text();
//每次读到一个单词都为1
private IntWritable outv = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
//1.将text换为string
String line = value.toString();
//2.分割
String[] words = line.split(" ");
//3.输出
for (String word : words) {
//将String转换为Text
outk.set(word);
//写出
context.write(outk, outv);
}
}
}
此处继承了Mapper类,其中的各个参数为:
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
之后便需重写map方法。
key:即为读的第几行,此处没用到。
value:即为读取的一行的数据,需要转换为java原生的类型进行计算。
context:为上下文,即配置项。
package com.neve.wordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outv.set(sum);
context.write(key,outv);
}
}
继承的Reducer类的参数大致与mapper类相同,但是此处的输入数据为map传来的,输出数据输出到结果中。
重写的reduce类中,key即为键,values为此键对应的值的集合,此处我们为单词统计,所以是1,1,1....,别的需求的话对应的值可能就会不同了。
package com.neve.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建配置
Configuration configuration = new Configuration();
//2.创建job
Job job = Job.getInstance(configuration);
//3.关联驱动类
job.setJarByClass(WordCountDriver.class);
//4.关联mapper和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//5.设置mapper的输出值和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6.设置最终的输出值和value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7.设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("F:\\Workplace\\IDEA_Workplace\\hadoopstudy\\input"));
FileOutputFormat.setOutputPath(job,new Path("F:\\Workplace\\IDEA_Workplace\\hadoopstudy\\output"));
//8.提交job
job.waitForCompletion(true);
}
}
运行即可。
源数据:
ai
hahah
ruguo
ni h daw
daw
h ni
结果:
ai 1
daw 2
h 2
hahah 1
ni 2
ruguo 1
需要注意的是,当map进行分割后,会将数据按照字母的顺序进行排序。
原文:https://www.cnblogs.com/wuren-best/p/13721676.html