首页 > 编程语言 > 详细

十二道MR习题 – 1 – 排序

时间:2017-09-18 09:20:33      阅读:359      评论:0      收藏:0      [点我收藏+]

题目:

一个文件,大小约为100G。文件的每一行都是一个数字,要求对文件中的所有数字进行排序。

对于这个题目,了解过Hadoop的同学可以笑而不语了。即使用spark实现也是非常简单的事情。

先说下如何用Hadoop实现。实际上也没什么好说的:Map任务逐行读入数字,而后在Reduce中输出就可以了,简单粗暴到令人发指。

看下代码好了:

package com.zhyea.dev;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class NumberSort {


    public static class SplitterMapper extends Mapper<Object, Text, IntWritable, IntWritable> {

        private static final IntWritable intWritable = new IntWritable();

        @Override
        public void map(Object key, Text value, Context context) {
            try {
                int num = Integer.valueOf(value.toString());
                intWritable.set(num);
                context.write(intWritable, intWritable);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }


    public static class IntegrateReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
            try {
                context.write(key, key);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "number-sort");
        job.setJarByClass(NumberSort.class);

        job.setMapperClass(SplitterMapper.class);
        job.setReducerClass(IntegrateReducer.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

在map方法中,输出值的Value部分我选择了一个IntWritable的值。Value值的类型也是可以设置为NullWritable的,不过这样map任务执行起来会比较慢,虽然reduce任务执行的会快一些,但是终究是得不偿失。

在我们的程序里没有执行任何排序的动作,但是输出的结果是有序的,这是因为在shuffle阶段已经完成了排序(一次快速排序,一次归并排序)。

再来看看用spark是如何完成的:

object NumSortJob {


  def main(args: Array[String]): Unit = {
    val inputPath = args(0)
    val outputPath = args(1)
    val conf = new SparkConf().setAppName("Num Sort")
    val sc = new SparkContext(conf)
    val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath)

    data.map(p => p._2.toString.toInt).distinct().sortBy[Int](p => p).coalesce(1, true).saveAsTextFile(outputPath)
  }

}

spark则需要主动进行排序。即使选择了使用sortBasedShuffle,它的排序也仅止于mapper端的排序,结果集不一定是有序的。

#########

十二道MR习题 – 1 – 排序

原文:http://www.cnblogs.com/amunote/p/7541311.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!