数据集文件:
customers:
1,Stephanie leung,555-555-555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov,2007 3,D,25.02,22-Jan-2009
1 Stephanie leung,555-555-555,B,88.25,20-May-2008 2 Edward Kim,123-456-7890,C,32.00,30-Nov,2007 3 Jose Madriz,281-330-8004,D,25.02,22-Jan-2009 3 Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
接下来,就来实现一下这个小程序:
在上一篇中说了,我们需要实现几个类,一个是TaggedMapOutput的子类,还有两个是DataJoinMapperBase的子类,一个是mapper,一个是reducer,下面是具体的实现:
TaggedWritable类继承自TaggedMapOutput:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
/*TaggedMapOutput是一个抽象数据类型,封装了标签与记录内容
此处作为DataJoinMapperBase的输出值类型,需要实现Writable接口,所以要实现两个序列化方法
自定义输入类型*/
public class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data) // 构造函数
{
//tag就是将数据集按key分区
this.tag = new Text(); // tag可以通过setTag()方法进行设置
this.data = data;
}
@Override
public void readFields(DataInput in) throws IOException {
tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
data.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
tag.write(out);
out.writeUTF(this.data.getClass().getName());
data.write(out);
}
@Override
public Writable getData() {
return data;
}
}import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import com.demo.writables.TaggedWritable;
public class JoinMapper extends DataJoinMapperBase {
// 这个在任务开始时调用,用于产生标签
// 此处就直接以文件名作为标签----标签的作用就是将数据集分区
@Override
protected Text generateInputTag(String inputFile) {
System.out.println("inputFile = " + inputFile);
return new Text(inputFile);
}
// 这里我们已经确定分割符为',',更普遍的,用户应能自己指定分割符和组键。
// 设置组键
@Override
protected Text generateGroupKey(TaggedMapOutput record) {
String tag = ((Text) record.getTag()).toString();
System.out.println("tag = " + tag);
String line = ((Text) record.getData()).toString();
String[] tokens = line.split(",");
return new Text(tokens[0]);
}
// 返回一个任何带任何我们想要的Text标签的TaggedWritable
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag); // 不要忘记设定当前键值的标签
return retv;///
}
}import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import com.demo.writables.TaggedWritable;
public class JoinReducer extends DataJoinReducerBase {
// 两个参数数组大小一定相同,并且最多等于数据源个数
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2)
return null; // 这一步,实现内联结
String joinedStr = "";
for (int i = 0; i < values.length; i++) {
if (i > 0)
joinedStr += ","; // 以逗号作为原两个数据源记录链接的分割符
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",", 2); // 将一条记录划分两组,去掉第一组的组键名。
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]); // 这只retv的组键,作为最终输出键。
return retv;
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.demo.mappers.JoinMapper;
import com.demo.reducers.JoinReducer;
import com.demo.writables.TaggedWritable;
public class DataJoinDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length != 2) {
System.err.println("Usage:DataJoin <input path> <output path>");
System.exit(-1);
}
Path in = new Path(args[0]);
Path out = new Path(args[1]);
JobConf job = new JobConf(conf, DataJoinDriver.class);
job.setJobName("DataJoin");
//FileSystem hdfs =FileSystem.get(conf);
FileSystem hdfs = in.getFileSystem(conf);
FileInputFormat.setInputPaths(job, in);
if (hdfs.exists(new Path(args[1]))) {
hdfs.delete(new Path(args[1]), true);
}
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
args = new String[]{"hdfs://localhost:9000/input/different datasource data/*.txt","hdfs://localhost:9000/output/secondOutput1"};
int res = ToolRunner.run(new Configuration(), new DataJoinDriver(), args);
System.exit(res);
}
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文:http://blog.csdn.net/wild_elegance_k/article/details/48065477