hdfs和hbase的交互,和写MapReduce程序类似,只是需要修改输入输出数据和使用hbase的javaAPI对其进行操作处理即可
public class HBaseToHdfs extends ToolRunner implements Tool { private Configuration configuration; //配置文件需要配置的属性 private static final String HDFS_NAME = "fs.defaultFS"; private static final String HDFS_VALUE = "hdfs://mycluster"; private static final String MAPREDUCE_NAME = "mapreduce.framework.name"; private static final String MAPREDUCE_VALUE = "yarn"; private static final String HBASE_NAME = "hbase.zookeeper.quorum"; private static final String HBASE_VALUE = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181"; //获取hbase表的扫描对象 private Scan getscan() { return new Scan(); } @Override public int run(String[] args) throws Exception { getConf(); //获取job实例对象 Job job = Job.getInstance(configuration, "copy_move"); //map/reduce的class链接 job.setMapperClass(hbase_To_Hdfs.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //设置输入输出 //由hbase导数据到hdfs故输入端需要使用TableMapReduceUtil类 TableMapReduceUtil.initTableMapperJob("ns3:t5", getscan(), hbase_To_Hdfs.class, Text.class, NullWritable.class, job); FileOutputFormat.setOutputPath(job, new Path(args[0])); //设置jar包 job.setJarByClass(HBaseToHdfs.class); //提交作业 int b = job.waitForCompletion(true) ? 0 : 1; return b; } @Override public void setConf(Configuration configuration) { configuration.set(HDFS_NAME, HDFS_VALUE); configuration.set(MAPREDUCE_NAME, MAPREDUCE_VALUE); configuration.set(HBASE_NAME, HBASE_VALUE); this.configuration = configuration; } @Override public Configuration getConf() { return configuration; } public static void main(String[] args) throws Exception { ToolRunner.run(HBaseConfiguration.create(),new HBaseToHdfs() , args); } // 创建map程序 private static Text mkey = new Text();
static class hbase_To_Hdfs extends TableMapper<Text, NullWritable> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //定义字符串拼接 StringBuffer stringBuffer = new StringBuffer(); /** * 使用value获取扫描器,获取hbase表的列名/列值等信息 * 使用StringBuffer来对需要的信息进行字符串拼接 */ CellScanner cellScanner = value.cellScanner(); while (cellScanner.advance()) { Cell cell = cellScanner.current(); stringBuffer.append(new String(CellUtil.cloneValue(cell))).append("\t"); } mkey.set(stringBuffer.toString()); context.write(mkey, NullWritable.get()); } } }
原文:https://www.cnblogs.com/maple-q/p/10896452.html