1)需求:
以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 多对多的关系 数据库:学生 课程 成绩表 学生表和课程表的自然连接 A 1 100 A 2 90 A : B A : C B : C A I,K,C,B,G,F,H,O,D, B A,F,J,E, C A,B D A,B A-B C,D
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
2)需求分析:
先求出A、B、C、….等是谁的好友
第一次输出结果
A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,
第二次输出结果
A-B E C A-C D F A-D E F A-E D B C A-F O B C D E A-G F E C D A-H E C D O A-I O A-J O B A-K D C A-L F E D A-M E F B-C A B-D A E B-E C B-F E A C B-G C E A B-H A E C B-I A B-K C A B-L E B-M E B-O A C-D A F C-E D C-F D A C-G D F A C-H D A C-I A C-K A D C-L D F C-M F C-O I A D-E L D-F A E D-G E A F D-H A E D-I A D-K A D-L E F D-M F E D-O A E-F D M C B E-G C D E-H C D E-J B E-K C D E-L D F-G D C A E F-H A D O E C F-I O A F-J B O F-K D C A F-L E D F-M E F-O A G-H D C E A G-I A G-K D A C G-L D F E G-M E F G-O A H-I O A H-J O H-K A C D H-L D E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E F
3)代码实现:
(1)第一次Mapper
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 1 获取一行 A:B,C,D,F,E,O String line = value.toString(); // 2 切割 String[] fileds = line.split(":"); // 3 获取person和好友 String person = fileds[0]; String[] friends = fileds[1].split(","); // 4写出去 for(String friend: friends){ // 输出 <好友,人> context.write(new Text(friend), new Text(person)); } } }
(2)第一次Reducer
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); //1 拼接 for(Text person: values){ sb.append(person).append(","); } //2 写出 context.write(key, new Text(sb.toString())); } }
(3)第一次Driver
package com.xyg.mapreduce.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OneShareFriendsDriver { public static void main(String[] args) throws Exception { // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定jar包运行的路径 job.setJarByClass(OneShareFriendsDriver.class); // 3 指定map/reduce使用的类 job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class); // 4 指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5 指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6 指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
(4)第二次Mapper
package com.xyg.mapreduce.friends; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // A I,K,C,B,G,F,H,O,D, // 友 人,人,人 String line = value.toString(); String[] friend_persons = line.split("\t"); String friend = friend_persons[0]; String[] persons = friend_persons[1].split(","); Arrays.sort(persons); for (int i = 0; i < persons.length - 1; i++) { for (int j = i + 1; j < persons.length; j++) { // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去 context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend)); } } } }
(5)第二次Reducer
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text friend : values) { sb.append(friend).append(" "); } context.write(key, new Text(sb.toString())); } }
(6)第二次Driver
package com.xyg.mapreduce.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TwoShareFriendsDriver { public static void main(String[] args) throws Exception { // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定jar包运行的路径 job.setJarByClass(TwoShareFriendsDriver.class); // 3 指定map/reduce使用的类 job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class); // 4 指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5 指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6 指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
原文:https://www.cnblogs.com/frankdeng/p/9255931.html