首页 > 其他 > 详细

mapreduce代码实现入门

时间:2015-12-15 21:04:08      阅读:193      评论:0      收藏:0      [点我收藏+]

  mapreduce代码主要包括三个类,map类、reduce类以及测试类!

以wordcount为例,

map类为:

    static class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException{
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreElements()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
            
        }
    }

reduce类为:

    static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable res = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException
        {
            int sum = 0;
            for(IntWritable val:values){
                sum += val.get();
            }
            res.set(sum);
            context.write(key, res);
        }
    }

主函数代码为:

    public static void main(String args[]) throws Exception{
        String inputfilepath = "hdfs://localhost:9000/input1";
        String outputfilepath = "hdfs://localhost:9000/output4";
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        job.setJobName("word-count");
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(inputfilepath));
        FileOutputFormat.setOutputPath(job, new Path(outputfilepath));
        job.waitForCompletion(true);
    }

其他的hadoop简单实例代码如:

数字求和:

技术分享
 1 package goal;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.FloatWritable;
 9 import org.apache.hadoop.io.LongWritable;  
10 import org.apache.hadoop.io.Text;  
11 import org.apache.hadoop.mapreduce.Job;  
12 import org.apache.hadoop.mapreduce.Mapper;  
13 import org.apache.hadoop.mapreduce.Reducer;  
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
16 import org.apache.hadoop.util.GenericOptionsParser;
17 
18 public class Sum {
19     
20     public static class SumMapper extends 
21         Mapper<Object, Text, Text, FloatWritable>{
22         private Text word = new Text("sum");
23         private static FloatWritable nv = new FloatWritable(1.0f);
24         public void map(Object key, Text value, Context context)
25             throws IOException, InterruptedException
26         {
27             StringTokenizer str = new StringTokenizer(value.toString());
28             float sum = 0;
29             while(str.hasMoreTokens()){
30                 String s = str.nextToken();
31                 float val = Float.parseFloat(s);
32                 sum = val;
33             }
34             nv.set(sum);
35             context.write(word, nv);
36         }
37     }
38     public static class SumReducer extends
39         Reducer<Text, FloatWritable, Text, FloatWritable>{
40         private Text k = new Text("sum");
41         private FloatWritable res = new FloatWritable();
42         public void reduce(Text key, Iterable<FloatWritable> values, 
43                 Context context) throws IOException, InterruptedException{
44             float sum = 0;
45             for(FloatWritable val : values){
46                 float v = val.get();
47                 sum += v;
48             }
49             res.set(sum);
50             context.write(k, res);
51         }
52     }
53     
54     public static void main(String args[])throws Exception{
55         String other[] = {"hdfs://localhost:9000/input2/1.txt", "hdfs://localhost:9000/output3"};
56         Configuration conf = new Configuration();
57         System.out.println("yes");
58         Job job = new Job(conf, "number sum");
59         job.setJarByClass(Sum.class);
60         job.setMapperClass(SumMapper.class);
61         job.setReducerClass(SumReducer.class);
62         job.setOutputKeyClass(Text.class);
63         job.setOutputValueClass(FloatWritable.class);
64         FileInputFormat.addInputPath(job, new Path(other[0]));
65         FileOutputFormat.setOutputPath(job, new Path(other[1]));
66         System.exit(job.waitForCompletion(true) ? 0 : 1);
67         System.out.println("yes");
68     }
69 
70 }
View Code

 

mapreduce代码实现入门

原文:http://www.cnblogs.com/z1987/p/5049234.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!