分析:
1、由于是任意列 任意表 任意路径,我们很容易想到是参数传入,参数传入后怎么去获得参数,根据我们以往的经验就是通过args[]来获取,但是在mapper或者是reducer中,我们不能直接将参数传入,因为map 和reduce是通过反射机制来创建的,对于传入的参数我们不能直接使用;我们发现在map和reduce有一个参数context,此类中包含很多的信息,例如configuration,并且configuration 还有set()方法,因此 我们可以将参数传给conf,然后由context拿到conf,进而拿到参数/
2、表头在第一行,有且只有一行,因此可以将表头的信息写入mapper 的setup()方法,因为它只进行一次操作
代码:
定义最后的程序运行为 hadoop jar **.jar t1 /tt "f1:c1|f1:c2|f2:c3"
对"f1:c1|f1:c2|f2:c3"进行拆分,首先按照"|"进行拆分,在java中 split("\\|"),因为|是转义字符,然后对每个f:c 再按照":"拆分,此时我们就拿到了单独的列族和列
定义自己的mapper函数(由于我们只是将数据读取出来,所以不用写reducer)
public static class MyMapper extends TableMapper<Text, Text> {
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String familyscolumns = context.getConfiguration().get(
"familys:columns");
String[] familycolumns = familyscolumns.split("\\|");
String familycolumn = "";
for (String fc : familycolumns) {
familycolumn += fc + "\t";
}
// 在第一行增加header 行健 列族:列
context.write(new Text("rowkey"), new Text(familycolumn));
}
Text k2 = new Text();
Text v2 = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
k2.set(key.get());
String familyscolumns = context.getConfiguration().get(
"familys:columns");
String[] familycolumns = familyscolumns.split("\\|");
String familycolumn="";
Cell columnLatestCell1=null;
for (String string : familycolumns) {
String[] fc = string.split(":");
columnLatestCell1 = value.getColumnLatestCell(
fc[0].getBytes(), fc[1].getBytes());
if (columnLatestCell1 != null) {
familycolumn+=new String(columnLatestCell1.getValue())+"\t";
} else{
familycolumn+="\t";
}
}
v2.set(new String(familycolumn));
context.write(k2, v2);
}
}
然后在客户端提交job
并把参数写入conf
Configuration conf = HBaseConfiguration.create();
conf.set("table", args[0]);
conf.set("hdfsPath", args[1]);
conf.set("familys:columns", args[2]);
用mapreduce实现从hbase导出到hdfs,实现一个工具类,能够支持任意表 任意列 任意路径导出,并且支持表头
原文:http://www.cnblogs.com/ggbond1988/p/4838129.html