首页 > 其他 > 详细

MapReduce-读取HBase

时间:2018-09-17 15:57:30      阅读:182      评论:0      收藏:0      [点我收藏+]

MapReduce读取HBase数据

代码如下

package com.hbase.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author:FengZhen
* @create:2018年9月17日
* MapReduce读取HBase中数据
*/
public class AnalyzeData extends Configured implements Tool{
	
	private static String addr="HDP233,HDP232,HDP231";
	private static String port="2181";
	
	public enum Counters { ROWS, COLS, VALID, ERROR }
	
	static class AnalyzeMapper extends TableMapper<Text, IntWritable>{
		private IntWritable ONE = new IntWritable(1);
		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			context.getCounter(Counters.ROWS).increment(1);
			try {
				for (Cell cell : value.listCells()) {
					context.getCounter(Counters.COLS).increment(1);
					String hbaseValue = Bytes.toString(CellUtil.cloneValue(cell));
					context.write(new Text(hbaseValue), ONE);
					context.getCounter(Counters.VALID).increment(1);
				}
			} catch (Exception e) {
				e.printStackTrace();
				context.getCounter(Counters.ERROR).increment(1);
			}
		}
	}
	
	static class AnalyzeReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable intWritable : values) {
				count = count + intWritable.get();
			}
			context.write(key, new IntWritable(count));
		}
	}
	
	public int run(String[] arg0) throws Exception {
		String table = arg0[0];
		String column = arg0[1];
		String outPath = arg0[2];
		
		Scan scan = new Scan();
		if (null != column) {
			byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
			if (colkey.length > 1) {
				scan.addColumn(colkey[0], colkey[1]);
			}else {
				scan.addFamily(colkey[0]);
			}
		}
		
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum",addr);
		configuration.set("hbase.zookeeper.property.clientPort", port);
		configuration.set(TableInputFormat.INPUT_TABLE, table);
		
		Job job = Job.getInstance(configuration);
		job.setJobName("AnalyzeData");
		job.setJarByClass(AnalyzeData.class);
		
		job.setMapperClass(AnalyzeMapper.class);
		job.setInputFormatClass(TableInputFormat.class);
		TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column)));
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//使用TableMapReduceUtil会报类找不到错误
		//Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
		//TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class, Text.class, IntWritable.class, job);
		
		job.setReducerClass(AnalyzeReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setNumReduceTasks(1);
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		return job.waitForCompletion(true) ? 0 : 1;
	}
	
	public static void main(String[] args) throws Exception {
		String[] params = new String[] {"test_table_mr","data:info","hdfs://fz/data/fz/output/mrReadHBase"};
		int exitCode = ToolRunner.run(new AnalyzeData(), params);
		System.exit(exitCode);
	}
}

 

MapReduce-读取HBase

原文:https://www.cnblogs.com/EnzoDin/p/9662472.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!