首页 > 其他 > 详细

Hadoop 自连接

时间:2015-01-02 09:48:38      阅读:374      评论:0      收藏:0      [点我收藏+]

                                           Hadoop自连接

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

    child        parent 


Tom        Lucy


Tom        Jack 


Jone        Lucy


Jone        Jack


Lucy        Mary


Lucy        Ben


Jack        Alice


Jack        Jesse


Terry        Alice


Terry        Jesse


Philip        Terry


Philip        Alma


Mark        Terry


Mark        Alma



结果输出为:

Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse


代码为:


package com.hadoop.twelve;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class SelfJoinMapper extends Mapper<LongWritable, Text, Text, Text> {


@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[]arr =line.split("\t");
if(arr.length==2){
context.write(new Text(arr[1]), new Text("0_"+arr[0]));//left表
context.write(new Text(arr[0]), new Text("1_"+arr[1]));//right表
}
}


}



package com.hadoop.twelve;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {


@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
for (Text temp : values) {
String[] arr =temp.toString().split("_");
if("0".equals(arr[0])){
grandchild.add(arr[1]);
}else if("1".equals(arr[0])){
grandparent.add(arr[1]);
}
}

              //这里我也不太明白笛卡尔积就完成了???????求指点!!
for(String gc:grandchild){
for(String gp:grandparent){
context.write(new Text(gc), new Text(gp));
}
}


}

    //test测试
public static void main(String[] args) {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
grandchild.add("a");
grandchild.add("b");
grandchild.add("c");
grandchild.add("d");

grandparent.add("a1");
grandparent.add("b1");
grandparent.add("c1");
grandparent.add("d1");

for(String gc:grandchild){
for(String gp:grandparent){
System.out.println(gc+"-------->"+gp);
}
}
}

}


package com.hadoop.twelve;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"self_join_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);


}


}


Hadoop 自连接

原文:http://blog.csdn.net/u010220089/article/details/42320929

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!