package com.hbase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author:FengZhen * @create:2018年9月17日 * MapReduce读取HBase中数据 */ public class AnalyzeData extends Configured implements Tool{ private static String addr="HDP233,HDP232,HDP231"; private static String port="2181"; public enum Counters { ROWS, COLS, VALID, ERROR } static class AnalyzeMapper extends TableMapper<Text, IntWritable>{ private IntWritable ONE = new IntWritable(1); @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException { context.getCounter(Counters.ROWS).increment(1); try { for (Cell cell : value.listCells()) { context.getCounter(Counters.COLS).increment(1); String hbaseValue = Bytes.toString(CellUtil.cloneValue(cell)); context.write(new Text(hbaseValue), ONE); context.getCounter(Counters.VALID).increment(1); } } catch (Exception e) { e.printStackTrace(); context.getCounter(Counters.ERROR).increment(1); } } } static class AnalyzeReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count = count + intWritable.get(); } context.write(key, new IntWritable(count)); } } public int run(String[] arg0) throws Exception { String table = arg0[0]; String column = arg0[1]; String outPath = arg0[2]; Scan scan = new Scan(); if (null != column) { byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); if (colkey.length > 1) { scan.addColumn(colkey[0], colkey[1]); }else { scan.addFamily(colkey[0]); } } Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum",addr); configuration.set("hbase.zookeeper.property.clientPort", port); configuration.set(TableInputFormat.INPUT_TABLE, table); Job job = Job.getInstance(configuration); job.setJobName("AnalyzeData"); job.setJarByClass(AnalyzeData.class); job.setMapperClass(AnalyzeMapper.class); job.setInputFormatClass(TableInputFormat.class); TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column))); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //使用TableMapReduceUtil会报类找不到错误 //Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry //TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class, Text.class, IntWritable.class, job); job.setReducerClass(AnalyzeReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); FileOutputFormat.setOutputPath(job, new Path(outPath)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[] {"test_table_mr","data:info","hdfs://fz/data/fz/output/mrReadHBase"}; int exitCode = ToolRunner.run(new AnalyzeData(), params); System.exit(exitCode); } }
原文:https://www.cnblogs.com/EnzoDin/p/9662472.html