多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。
样例输入如下所示。
a.txt (工厂表)
f1 3
f2 2
f3 1
b.txt(地址表)
1 Beijing
2 Shanghai
3 Tianjin
样例输出:
f3 Beijing
f2 Shanghai
f1 Tianjin
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
这个实例的具体分析参考单表关联实例。下面给出代码。
程序代码如下所示:
package test; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { public static class Map extends Mapper<LongWritable, Text, Text, Text>{ private static Text k = new Text(); private static Text v = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { String[] splits = value.toString().split("\t"); if(splits.length != 2){ return ; } //取得文件名 a.txt(工厂名字,序号) b.txt(序号,地址) String fileName = ((FileSplit)context.getInputSplit()).getPath().getName(); if("a.txt".equals(fileName)){ k.set(splits[1]); v.set("1"+splits[0]); }else if("b.txt".equals(fileName)){ k.set(splits[0]); v.set("2"+splits[1]); }else{ return ; } context.write(k, v); }; } public static class Reduce extends Reducer<Text, Text, Text, Text>{ private static List<String> names = new ArrayList<String>(); private static List<String> addrs = new ArrayList<String>(); private static Text name = new Text(); private static Text addr = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException { for (Text value : values) { String temp = value.toString(); if(temp.startsWith("1")){ names.add(temp.substring(1)); }else{ addrs.add(temp.substring(1)); } } for (String n : names) { for (String a : addrs) { name.set(n); addr.set(a); context.write(name, addr); } } names.clear(); addrs.clear(); }; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage:MTjoin"); System.exit(2); } Job job = new Job(conf, "MTjoin"); job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
输出结果:
[root@hadoop ~]# hadoop dfs -cat /output/*
f3 Beijing
f2 Shanghai
f1 Tianjin
原文:http://www.cnblogs.com/jsunday/p/3795459.html