1.需求
利用mapreduce编程框架编写wordcount程序。
2.环境配置
(1)hadoop为本地模式
(2)pom文件代码如下
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies>
3.mapreduce介绍
(1)mapreduce结构
完整的mapreduce在分布式运行时有三类实例:MRAppMaster,MapTask,ReduceTask.
(2)流程解析
4.wordcount程序介绍
在mapreduce框架下实现wordcount,需要自定义一个WordcountMapper继承Mapper并重写map方法,自定义一个WordcountReducer继承Reducer并重写reduce方法,以及程序的main方法WordcountDriver。
5.wordcount代码具体实现
(1)WordcountMapper类
//1.定义四个泛型类型:KEYIN:LongWritable,VALUEIN:Text,
// KEYOUT:Text, VALUEOUT:IntWritable
//各个泛型类型与java中的对比:LongWritable->Long
//Text->String,IntWritable->Integer
//2.这里之所以定义这些泛型类型,是因为网络传输需要序列化,精简java中的序列化接口
//3.关于四个泛型作用:
// KEYIN(LongWritable类型):一行文本的开始位置,在整个文本的偏移量(用来确定当前是哪一行的,对于业务来说没有用)
// VALUEIN(Text类型):读到一行文本的内容,使用这个文本划分成单词
// KEYOUT(Text类型):输出的单词
// VALUEOUT(IntWritable类型):单词的统计次数
public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//map阶段的业务逻辑,被maptask调用
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将传入的数据按空格分割成单词
String[] words = value.toString().split(" ");
//将单词输出为<单词,1>
for(String word:words){
//将单词作为key,将次数1作为value
//根据单词的分发,相同的key会进入相同的reduce task中
//context是mr框架提供的上下文
//还注意要使用Text,IntWritable类型
context.write(new Text(word),new IntWritable(1));
}
}
}
(2)WordcountReducer类
//KEYIN,VALUEIN对应mapper输出的KEYOUT,VALUEOUT
//KEYOUT,VALUEOUT是自定义的
//KEYOUT是单词,VALUEOUT是单词的总个数
public class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
//reduce函数会被reduceTask任务调用,每一组单词调用一次
//每一组的意思就是[<hello,1>,<hello,1>,<hello,1>]...
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value:values){
//value表示从迭代器中取出的值,是统计的个数,要使用.get()转换函数转换成int类型
count += value.get();
}
//key是单词,new IntWritable(count)是单词的总个数
//其实context是将结果写入文件当中,文件存储在hdfs上
context.write(key,new IntWritable(count));
}
}
(3)WordcountDriver类
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//指定本程序jar包所在的路径
job.setJarByClass(WordcountDriver.class);
//利用反射指定job要使用的mapper业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//利用反射,指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终(也就是reduce)输出的数据类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定job输入原始文件所在目录
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job的输出结果
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//将job中配置的相关参数,提交给yarn运行
//等待集群完成工作
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
6.打成jar包并运行
7.github链接:
https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/MRDemos
原文:https://www.cnblogs.com/ManchesterCityCoder/p/10799511.html