hadoop2.2
场景描述:针对多个文件夹中的文件进行连接操作(单个文件夹内的文件格式相对统一),指定一个文件夹内的文件为主表(单个文件夹内的所有文件的主键唯一),所有在主表中的主键都要输出。
实现思路:在map中读入所有的文件,然后输出主键(默认每个文件每行的\t前面的数据)作为key,其他值+该文件的父目录作为value进行输出,在reduce中根据传入进入的主表父目录来对所有key进行判断,如果该key中的values(value的list)没有含有主表的父目录的标识,那么这个key就不是主表的key,也就意味着这个key可以不用输出,如果包含那么就要输出。
在reduce中输出含有key的values,需要解决下面的问题:1.保持values的相对顺序(来自每个目录的文件数据保持相对顺序);2.针对没有的数据进行填充(假如用null来填充)。
比如使用下面的数据:
/input/u/u1.txt:
user_id1 user_name_1 user_age_1 user_id2 user_name_2 user_age_2 user_id33 user_name_33 user_age_33/input/u/u2.txt:
user_id11 user_name_11 user_age_11 user_id21 user_name_21 user_age_21 user_id31 user_name_31 user_age_31/input/u/u3.txt:
user_id43 user_name_43 user_age_43 user_id42 user_name_42 user_age_42 user_id41 user_name_41 user_age_41 user_id45 user_name_45 user_age_45/input/item/i1.txt:
user_id1 item_name1 item_column1 user_id2 item_name2 item_column2 user_id3 item_name3 item_column3如果以/input/item作为主表,那么输出应该是:
user_id1 item_name1 item_column1 user_name_1 user_age_1 user_id2 item_name2 item_column2 user_name_2 user_age_2 user_id3 item_name3 item_column3 null null如果以/input/u作为主表,那么输出应该是:
user_id1 item_name1 item_column1 user_name_1 user_age_1 user_id11 null null null user_name_11 user_age_11 user_id2 item_name2 item_column2 user_name_2 user_age_2 user_id21 null null user_name_21 user_age_21 user_id31 null null user_name_31 user_age_31 user_id33 null null user_name_33 user_age_33 user_id41 null null user_name_41 user_age_41 user_id42 null null user_name_42 user_age_42 user_id43 null null user_name_43 user_age_43 user_id45 null null user_name_45 user_age_45话不多说,上代码:
driver:
package org.fansy.hadoop.mr;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.fansy.hadoop.io.IoOperation;
import org.fansy.hadoop.mr.join.JoinMapper;
import org.fansy.hadoop.mr.join.JoinReducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 同一个目录的文件不能有相同的key
* @author fansy
*
*/
public class JoinDemoDriver01 {
/**
* 在Map中读取文件的名字,然后在reduce中就可以按照一定的顺序进行输出,保持顺序一致性
* 默认第一列为输出key,分隔符暂时定为‘\t‘
* @param args
*/
private static Logger log = LoggerFactory.getLogger(JoinDemoDriver01.class);
public static final String DIRANDCOLUMNS= "dirAndColumns";
public static final String JOINOP="joinOp";
public static final String KEYTABLE ="keyTable";
public static void printUsage(){
System.out.println("JoinDemo01 [-r <reduces>] " +
"[-joinOp <inner|outer|leftJoin>] " +
"[-keyInput <keyInput>]"+
"[input]* <input> <output>");
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
List<String> otherArgs = new ArrayList<String>();
int num_reduces= 1;
String op="leftJoin";
String keyTable = "";
for(int i=0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
}else if("-keyInput".equals(args[i])){
keyTable=args[++i];
} else if ("-joinOp".equals(args[i])) {
op = args[++i];
} else {
otherArgs.add(args[i]);
}
}catch(Exception e){
e.printStackTrace();
printUsage(); // exits
}
}
if (otherArgs.size() < 2) {
System.out.println("ERROR: Wrong number of parameters: ");
printUsage();
}
conf.set(JOINOP, op);
conf.set(KEYTABLE, getNameFromDir(keyTable));
Job job = new Job(conf,"join demo01");
job.setJobName("join demo");
job.setJarByClass(WordCount.class);
// job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(JoinMapper.class);
// job.setCombinerClass(JoinReducer.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(num_reduces);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.remove(otherArgs.size() - 1)));
TreeMap<String ,Integer> treeMap= new TreeMap<String,Integer>();
for (int i=0;i<otherArgs.size();i++) {
treeMap.put(getNameFromDir(otherArgs.get(i)), IoOperation.getColumnsFromDir(otherArgs.get(i), "\t"));
FileInputFormat.addInputPath(job, new Path(otherArgs.get(i)));
}
StringBuffer fileDirAndColumns = new StringBuffer();
for(Map.Entry<String, Integer> a:treeMap.entrySet()){
fileDirAndColumns.append(a.getKey()+":"+a.getValue()+",");
}
// set dir name and columns
// 按照dir排过序的
job.getConfiguration().set(DIRANDCOLUMNS, fileDirAndColumns.substring(0, fileDirAndColumns.length()-1));
System.exit(job.waitForCompletion(true)?0:1);
}
private static String getNameFromDir(String dir){
int index= dir.lastIndexOf("/");
if (index==dir.length()-1){
String temp= dir.substring(0,index);
return temp.substring(temp.lastIndexOf("/")+1);
}else{
return dir.substring(index+1);
}
}
}
package org.fansy.hadoop.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
private static Logger log = LoggerFactory.getLogger(JoinMapper.class);
private String filename= null;
private Text keyText= new Text();
private Text valueText= new Text();
public void setup(Context cxt){
InputSplit input=cxt.getInputSplit();
filename=((FileSplit) input).getPath().getParent().getName();
log.info("file name:"+filename);
}
public void map(LongWritable key, Text value, Context cxt) throws IOException,InterruptedException {
String[] values= value.toString().split("\t");
// 第一个\t前面为key,后面是value
String valueLeft = value.toString().substring((values[0]+"\t").length());
keyText.set(values[0]);
valueText.set(valueLeft+"<-->"+filename); // ***此处的分隔符 可以自己定制,需要和数据区分开来,即数据中不应该含有此分隔符
cxt.write(keyText,valueText);
}
}
reducer:package org.fansy.hadoop.mr.join;
import java.io.IOException;
import java.util.TreeSet;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.fansy.hadoop.mr.JoinDemoDriver01;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JoinReducer extends Reducer<Text,Text,Text,Text>{
/**
* 1.同一个目录不能含有相同的key
* 2.不管何种方式的关联,都要保持列数不变,同时列排序不变
* 3.inner方式,如果接受的values不够目录的个数,那么就说明有一个或多个文件中没有这个key,那么这条记录就不用输出
* 4.outer方式,如果接受的values不够目录的个数,那么就要使用某个不够文件个数的列数(总列数-1),来填充,以达到保持列不缺少的情况
*/
private static Logger log = LoggerFactory.getLogger(JoinReducer.class);
private String op=null;
private String keyTable =null;
private String dirAndColumns=null;
private String[] dirs=null;
private int[] columns=null;
public void setup(Context cxt){
System.out.println("JoinReducer setup");
// 关联方式
op=cxt.getConfiguration().get(JoinDemoDriver01.JOINOP);
// 主表,可能是空
keyTable=cxt.getConfiguration().get(JoinDemoDriver01.KEYTABLE);
// 目录排序以及每个目录的列数
dirAndColumns= cxt.getConfiguration().get(JoinDemoDriver01.DIRANDCOLUMNS);
String[] dirAndColumnsArr= dirAndColumns.split(",");
// 目录
dirs= new String[dirAndColumnsArr.length];
// 列数
columns=new int[dirAndColumnsArr.length];
// dirAndColumns已经排过序了
for( int i=0;i<dirAndColumnsArr.length;i++){
String[] tempDirColumn=dirAndColumnsArr[i].split(":");
dirs[i]=tempDirColumn[0];
columns[i]=Integer.parseInt(tempDirColumn[1]);
}
// 测试
/*StringBuffer buff= new StringBuffer();
for(int i=0;i<dirs.length;i++){
buff.append("dir["+i+"]:"+dirs[i]).append(",").append("columns["+i+"]:"+columns[i]).append("\n");
}
log.info(buff.toString());
log.info("keyTable:"+keyTable);*/
}
public void reduce(Text key, Iterable<Text> values, Context cxt) throws IOException,InterruptedException {
if("leftJoin".equals(op)){
leftJoin(key,values,cxt);
}else if("inner".equals(op)){
innerJoin(key,values,cxt);
}else if("outer".equals(op)){
outerJoin(key,values,cxt);
}else{
log.info("wrong join method");
}
}
/**
* 暂时未实现
* @param key
* @param values
* @param cxt
*/
private void outerJoin(Text key,
Iterable<Text> values,
Context cxt) {
}
/**
* 暂时未实现
* @param key
* @param values
* @param cxt
*/
private void innerJoin(Text key,
Iterable<Text> values,
Context cxt) {
}
/**
* 如果含有主表的key则考虑输出,否则直接退出
* @param key
* @param values
* @param cxt
* @throws IOException
* @throws InterruptedException
*/
private void leftJoin(Text key, Iterable<Text> values,
Context cxt) throws IOException, InterruptedException {
// 这里暂时考试使用TreeSet,后期如果效率不行,可以考虑使用其他数据存储结构
boolean hasKeyTableKey=false;
TreeSet<JoinData> treeSet = new TreeSet<JoinData>();
JoinData joinData =null;
for (Text v:values) {
/*log.info("key:"+key.toString()+"--"+"v.toString():"+v.toString());*/
String[] vArr= v.toString().split("<-->"); // ***和mapper中的分隔符对应
joinData = new JoinData(vArr[1],vArr[0]); // 注意mapper中的顺序
treeSet.add(joinData);
if(joinData.getDir().equals(keyTable)){
hasKeyTableKey=true;
}
}
// 如果不包含主表的key,那么就直接退出,不输出任何记录
if(!hasKeyTableKey){
return ;
}
// 拼凑不够个数的values(某个文件夹中的key没有则会导致这种情况)
StringBuffer buff = new StringBuffer();
/* int i=0;
for(JoinData v:treeSet){
while(!v.getDir().equals(dirs[i])){
buff.append(getNumsNull(columns[i],"\t")).append("\t"); // ***此处getNumsNull可以替换为其他字符
i++;
}
buff.append(v.getValue().toString()).append("\t");
i++;
}*/
// 这里设计不合理,后期改进
boolean flag=false;
for(int i=0;i<dirs.length;i++){
flag=false;
for(JoinData v:treeSet){
if(dirs[i].equals(v.getDir())){
buff.append(v.getValue()).append("\t");
flag=true;
}
}
if(!flag){
buff.append(getNumsNull(columns[i]-1,"\t")).append("\t"); // ***此处getNumsNull可以替换为其他字符,减1是因为除去主键
}
}
cxt.write(key, new Text(buff.toString()));
}
private String getNumsNull(int num,String splitter){
StringBuffer buff = new StringBuffer();
for(int i=0;i<num-1;i++){
buff.append("null"+splitter);
}
buff.append("null");
return buff.toString();
}
}
同时代码中还有些地方有待改进,而且在reduce中实现连接是比较慢的,这点在查阅了一些文章后也可以得到。在hadoop的example中实现的是在map端的连接,但是其需要输入文件是已经排过序的,这点可能需要在运行Join之前先做一步排序,而且其不支持左连接。其三种方式为:inner:所有文件中共有的key输出,是交集;outer:所有文件的key都输出,是并集;override:越靠近输出文件的文件(在输出参数中)其value值替换越不靠近的。
最后附上调用上面代码的命令:
./yarn jar yarn.jar org.fansy.hadoop.mr.JoinDemoDriver01 -joinOp leftJoin -keyInput /input/item /input/u /input/item /output/joindemo/008其中,打包好的yarn.jar在http://download.csdn.net/detail/fansy1990/7008039可以下载。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
原文:http://blog.csdn.net/fansy1990/article/details/20744093