首页 > 其他 > 详细

Hadoop 自连接

时间:2015-02-01 23:23:51      阅读:457      评论:0      收藏:0      [点我收藏+]
环境:CentOS6.6  Hadoop1.2.1

样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild  grandparent
[grid@hadoop1 ~]$ hadoop fs -cat ./in/genealogy.txt
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma

程序:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key
        context.write(key, new Text("2" + value.toString())); //右表的 child 做 key
    }
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        List<String> childList = new ArrayList<String>();
        List<String> grandList = new ArrayList<String>();
        for (Text value : values) {
            if (value.toString().startsWith("1")) {
                childList.add(value.toString().substring(1));
            } else {
                grandList.add(value.toString().substring(1));
            }
        }
        for (String child : childList) {
            for (String grand : grandList) {
                context.write(new Text(child), new Text(grand));
            }
        }
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoin {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: SelfJoin <input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符
        //conf.set("mapred.jar", "./out/SelfJoin.jar");
        //conf.set("fs.default.name", "hdfs://hadoop1:9000");
        //conf.set("mapred.job.tracker", "hadoop1:9001");

        Job job = new Job(conf);
        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat
        job.setJarByClass(SelfJoin.class);
        job.setJobName("SelfJoin");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SelfJoinMapper.class);
        job.setReducerClass(SelfJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

查看程序输出如下:

[grid@hadoop1 ~]$ hadoop fs -cat ./out/9/part-r-00000
Tom	Alice
Tom	Jesse
Jone	Alice
Jone	Jesse
Tom	Mary
Tom	Ben
Jone	Mary
Jone	Ben
Philip	Alice
Philip	Jesse
Mark	Alice
Mark	Jesse

Hadoop 自连接

原文:http://my.oschina.net/zc741520/blog/374222

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!