在一堆给定的文本文件中统计输出每一个单词出现的总次数
在 /opt/test 目录下创建一个文件 wordcount.txt ,里面键入几个单词,并用空格分隔开
package com.zyd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; /** * @Author: ZYD * @Date: 2021/7/30 下午 21:28 */ /** * 如果使用Java实现这个单词计数存在的问题: * 1. 操作的数据是HDFS分布式集群,可能这个文件有几个G的大小,如果使用Java,那么我们必须把这些数据全部拉取到JVM内存中去运行,可能会导致Java内存崩溃 * 2. 如果我们不把这些数据全部拉取到本地操作,那么可能会把这个Java程序运行在不同的电脑上执行,只需要处理当前机器上的block块的数据即可 * 但是这样也会存在问题: * Java代码需要分布式的运行在不同的电脑上,那么处理结束之后将面临如何将处理结果汇总起来、如何把控每一个节点上程序有没有运行结束这些问题 */ public class WordCountJava { public static void main(String[] args) { // 1. 连接HDFS集群 Configuration conf = new Configuration(); FileSystem fs = null; try { fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root"); /** * 2. 读取文件数据 ----- IO流 * 四个抽象父类: * 字节流:InputStream、OutputStream * 字符流:Reader、Writer */ // 创建单词计数文件的输入流 FSDataInputStream open = fs.open(new Path("/test/wordcount.txt")); // 字节转字符流 InputStreamReader isr = new InputStreamReader(open); BufferedReader br = new BufferedReader(isr); String line; // 准备一个map集合存放数据,数据格式:单词,单词出现的次数 Map<String, Integer> map = new HashMap<>(); // 通过BufferedReader字符缓冲流的readline方法依次读取一行数据 // 将每一行数据按空格分隔,统计次数 /** * 为什么要用(line = br.readLine()) != null 而不用下面的方法? * 之所以使用(line = br.readLine()) != null判断,是因为readline()方法的特性: * readline是用来判断当前行有没有数据,如果有数据的话,那么将这一行的数据赋值给一个String类型的变量,然后将这个指针下移 * 用下面的方法的话,意味着调用了两次readline方法,再调用这个readline方法的时候,他返回的就不是当前行的数据了,而是下一行的数据 * 此时也就代表 line 这个字符串里存储的是第二行的数据 */ while (((line = br.readLine()) != null)) { // while (br.readLine() != null) { // line = br.readLine(); System.out.println(line); String[] words = line.split(" "); /** * 如果单词还没有在map集合添加,name在map集合加入 word,1 * 如果出现,那么在map集合添加 word,以前的次数+1 */ for (String word: words) { if (map.get(word) == null) { map.put(word, 1); } else { map.put(word, map.get(word) + 1); } } } System.out.println(map); } catch (Exception e) { e.printStackTrace(); } finally { if (fs != null) { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
原因:之所以使用(line = br.readLine()) != null判断,是因为readline()方法的特性:
* readline是用来判断当前行有没有数据,如果有数据的话,那么将这一行的数据赋值给一个String类型的变量,然后将这个指针下移
* 用下面的方法的话,意味着调用了两次readline方法,再调用这个readline方法的时候,他返回的就不是当前行的数据了,而是下一行的数据
* 此时也就代表 line 这个字符串里存储的是第二行的数据
1. WordCountMapper.java
package MapReduce; /** * @Author: ZYD * @Date: 2021/8/1 下午 22:44 */ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 第一步:继承Mapper类(定义数据的输入格式和数据的输出形式) * 如果要使用MR程序,我们要指定数据输入的kye-value键值对类型 * 同时要指定数据输出的key-value键值对类型 * Mapper阶段----一行数据执行一次Mapper * Mapper阶段的输入key-value键值对格式很固定:LongWritable Text * 其中LongWritable 是 long 基本数据类型的hadoop序列化的类,一般情况下Map阶段的key代表的是文件的偏移量----理解为行号 * Text 是String类型的hadoop序列化类,代表的是每一行的数据 * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /** * mapTask的核心处理逻辑: * 每一行文件数据需要走一个map方法 * @param key ----- Mapper阶段输入的key值-----行号 * @param value -----Mapper阶段输入的value值-----当前行对应的字符串数据 * @param context ------上下文对象,主要功能是为了实现将Map阶段的数据输入到Reduce阶段 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 第一步:将每一行数据value按照空格切割,然后将每一个得到的单词当做 key,1当做value,将他们输出到Reduce阶段 String line = value.toString(); String[] words = line.split(" "); for (String word: words) { // 将每一行数据空格切割后的单词以单词为key,1为value输出到reduce阶段 // 到了reduce阶段,会根据key值将value给聚合起来 context.write(new Text(word), new LongWritable(1)); } } }
2. WordCountReducer.java
package MapReduce; /** * @Author: ZYD * @Date: 2021/8/1 下午 23:01 */ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 继承Reducer类 * Reducer类也粗腰指定输入的key-value的数据类型,也需要指定输出的key-value数据类型 * * 此时Reducer阶段的输入的key-value键值对类型不能随便指定,它应该是Mapper阶段的输出数据类型 */ public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> { /** * reduceTask核心处理业务逻辑的方法 * 他是每一组key值相同的数据执行一次 * @param key ------ map阶段输出的key值 ---- 单词 * @param values ------ values是一个类似于集合的数据,里面放的是key相同的数据的所有value值的集合 * @param context ------ 上下文对象,将结果以<key, value>键值对的形式输出到最终的结果文件中 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { /** * 思路:将values中的数据累加起来 ,这样的话单词对应出现的次数就确定了 */ Iterator<LongWritable> iterator = values.iterator(); long result = 0L; while (iterator.hasNext()) { LongWritable next = iterator.next(); result += next.get(); } context.write(key, new LongWritable(result)); } }
3. WordCountDriver.java
package MapReduce; /** * @Author: ZYD * @Date: 2021/8/1 下午 23:01 */ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 继承Reducer类 * Reducer类也粗腰指定输入的key-value的数据类型,也需要指定输出的key-value数据类型 * * 此时Reducer阶段的输入的key-value键值对类型不能随便指定,它应该是Mapper阶段的输出数据类型 */ public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> { /** * reduceTask核心处理业务逻辑的方法 * 他是每一组key值相同的数据执行一次 * @param key ------ map阶段输出的key值 ---- 单词 * @param values ------ values是一个类似于集合的数据,里面放的是key相同的数据的所有value值的集合 * @param context ------ 上下文对象,将结果以<key, value>键值对的形式输出到最终的结果文件中 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { /** * 思路:将values中的数据累加起来 ,这样的话单词对应出现的次数就确定了 */ Iterator<LongWritable> iterator = values.iterator(); long result = 0L; while (iterator.hasNext()) { LongWritable next = iterator.next(); result += next.get(); } context.write(key, new LongWritable(result)); } }
分布式方案需要考虑很多问题,但是我们可以将分布式程序中的公共功能封装成框架,让开发人员将精力集中在业务逻辑上,而MapReduce就是这样一个分布式程序的通用框架
分布式的运算程序需要分成至少两个阶段:
MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户业务逻辑非常复杂,那就只能使用多个MapReduce程序串行运行
Notes:Reduce Task的个数由业务来决定‘
一个完整的mapreduce程序在分布式运行时有三类实例进程:
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)------三步编程法
整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
Notes:
首先将存储在文件中的数据分片(假设分为三片),之后三个maptask同时作业,当所有maptask任务完成后,启动相应数量的ReduceTask,并告知ReduceTask处理数据的范围(数据分区)
1) 在MapReduce程序读取文件的输入目录上存放相应的文件。客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
2) 客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
3) MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
4) maptask利用客户指定的inputformat来读取数据,形成输入KV对。
5) maptask将输入KV对传递给客户定义的map()方法,做逻辑运算
6) map()运算完毕后将KV对收集到maptask缓存。
7) maptask缓存中的KV对按照K分区排序后不断写到磁盘文件
8) MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
9) Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
10) Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。
原文:https://www.cnblogs.com/zyd-994264926326/p/15102732.html