Mapper
1 package com.scb.jason.mapper; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 import java.io.IOException; 9 import java.util.StringTokenizer; 10 11 /** 12 * Created by Administrator on 2017/7/23. 13 */ 14 // Step 1: Map Class 15 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> { 16 17 private Text mapOutputkey = new Text(); 18 private final static IntWritable mapOutputValue = new IntWritable(1); 19 20 @Override 21 protected void setup(Context context) throws IOException, InterruptedException { 22 super.setup(context); 23 } 24 25 @Override 26 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 27 String lineValue = value.toString(); 28 StringTokenizer stringTokenizer = new StringTokenizer(lineValue); 29 while(stringTokenizer.hasMoreTokens()){ 30 String wordValue = stringTokenizer.nextToken(); 31 mapOutputkey.set(wordValue); 32 context.write(mapOutputkey,mapOutputValue); 33 } 34 } 35 36 @Override 37 protected void cleanup(Context context) throws IOException, InterruptedException { 38 super.cleanup(context); 39 } 40 41 @Override 42 public void run(Context context) throws IOException, InterruptedException { 43 super.run(context); 44 } 45 }
Reducer
1 package com.scb.jason.reducer; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 import java.io.IOException; 8 9 /** 10 * Created by Administrator on 2017/7/23. 11 */ 12 // Step 2: Reduce Class 13 public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { 14 15 private IntWritable outputValue = new IntWritable(); 16 17 @Override 18 protected void setup(Context context) throws IOException, InterruptedException { 19 super.setup(context); 20 } 21 22 @Override 23 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 24 int sum = 0; 25 for(IntWritable value:values){ 26 sum += value.get(); 27 } 28 outputValue.set(sum); 29 context.write(key,outputValue); 30 } 31 32 @Override 33 protected void cleanup(Context context) throws IOException, InterruptedException { 34 super.cleanup(context); 35 } 36 37 @Override 38 public void run(Context context) throws IOException, InterruptedException { 39 super.run(context); 40 } 41 }
Driver
1 package com.scb.jason.driver; 2 3 import com.scb.jason.mapper.WordCountMapper; 4 import com.scb.jason.reducer.WordCountReducer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16 17 import java.io.IOException; 18 19 /** 20 * Created by Administrator on 2017/7/17. 21 */ 22 public class WordCount extends Configured implements Tool { 23 24 // Step 3: Driver 25 public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 26 Configuration configuration = new Configuration(); 27 FileSystem fs = FileSystem.get(configuration); 28 29 Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); 30 job.setJarByClass(this.getClass()); 31 32 Path input = new Path(args[0]); 33 FileInputFormat.addInputPath(job,input); 34 35 job.setMapperClass(WordCountMapper.class); 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(IntWritable.class); 38 39 job.setReducerClass(WordCountReducer.class); 40 job.setMapOutputKeyClass(Text.class); 41 job.setMapOutputValueClass(IntWritable.class); 42 43 Path outPath = new Path(args[1]); 44 if(fs.exists(outPath)){ 45 fs.delete(outPath,true); 46 } 47 FileOutputFormat.setOutputPath(job,outPath); 48 49 boolean isSuccess = job.waitForCompletion(true); 50 return isSuccess?1:0; 51 } 52 53 public static void main(String[] args) throws Exception { 54 int exitCode = ToolRunner.run(new WordCount(),args); 55 System.exit(exitCode); 56 } 57 58 }
原文:http://www.cnblogs.com/xdlaoliu/p/7225603.html