1 package mapreduce; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 14 import java.io.IOException; 15 import java.util.Iterator; 16 import java.util.StringTokenizer; 17 18 public class MyMapReduce { 19 //1自己的map类 20 //继承mapper类,<输入key,输入value,输出value,输出key> 21 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 22 //每个key设置输出value为1 23 IntWritable i = new IntWritable(1); 24 Text keyStr = new Text(); 25 26 @Override 27 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 28 //TextInputFormat是Hadoop默认的数据输入格式,但是它只能一行一行的读记录 29 30 StringTokenizer itr = new StringTokenizer(value.toString()); 31 while (itr.hasMoreTokens()) { 32 keyStr.set(itr.nextToken()); 33 context.write(keyStr, i); 34 } 35 36 } 37 } 38 39 //2自己的reducer类 40 //继承reducer类,<输入key,输入value,输出value,输出key> 41 //reducer类的输入,就是mapper的输出 42 //mapper类map方法的数据输入到Reduce类group方法中,对key的value进行分组得到values,再放入reduce方法中 43 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 44 IntWritable countWritable = new IntWritable(); 45 46 @Override 47 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 48 String keyStr = key.toString(); 49 //在map中每个key对应的value为1, 50 //那么reduce每个key对应的集合便是重复key的个数的长度,并且每个值为1 51 //即集合元素值相加即为key的数量 52 int count = 0; 53 Iterator<IntWritable> it = values.iterator(); 54 while (it.hasNext()) { 55 count += it.next().get(); 56 } 57 countWritable.set(count); 58 System.out.println(keyStr + "---" + count); 59 context.write(key, countWritable); 60 } 61 } 62 63 //3运行类 64 public int run(String[] args) throws Exception { 65 //hadoop配置上下文 66 Configuration conf = new Configuration(); 67 //这里要是没有把配置文件放入resources中,需要手动添加配置文件,或者添加配置参数 68 // conf.addResource("core-site.xml"); 69 // conf.addResource("hdfs-site.xml"); 70 //通过上下文构建job实例,并传入任务名称 71 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); 72 //设置reduce数量 73 job.setNumReduceTasks(3); 74 //必须添加,否则本地运行没问他,服务器报错 75 job.setJarByClass(MyMapReduce.class); 76 //设置任务读取数据 77 //调用时传入参数,第一个参数为路径输入参数 78 Path inputPath = new Path(args[0]); 79 FileInputFormat.addInputPath(job, inputPath); 80 81 //调用时传入参数,第二个参数为路径输出参数 82 Path outputPath = new Path(args[1]); 83 FileOutputFormat.setOutputPath(job, outputPath); 84 85 //设置mapper类参数 86 job.setMapperClass(MyMapper.class); 87 job.setMapOutputKeyClass(Text.class); 88 job.setMapOutputValueClass(IntWritable.class); 89 90 //设置reducer类参数 91 job.setReducerClass(MyReducer.class); 92 job.setOutputKeyClass(Text.class); 93 job.setOutputValueClass(IntWritable.class); 94 //设置任务保存结果数据 95 96 //设置combiner类,同reduce一样,同样继承reduce类 97 //combiner将多个map的数据单独处理,reduce处理所有map的所有数据 98 //job.setCombinerClass(); 99 boolean isSuccess = job.waitForCompletion(true); 100 return isSuccess ? 0 : 1; 101 } 102 103 public static void main(String[] args) throws Exception { 104 105 //将传入的第一个参数作为文件输入参数,第二个参数作为文件输出参数 106 System.out.println(args[0]); 107 System.out.println(args[1]); 108 MyMapReduce mr = new MyMapReduce(); 109 int success = -1; 110 success = mr.run(args); 111 System.out.println(success); 112 113 } 114 }
大数据学习日志——java编写hadoop的mapreduce实现wordcount功能
原文:https://www.cnblogs.com/SaltFishYe/p/10587130.html