public class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
/**
* @Author: xiaolaotou
* @Date: 2018/11/27
*/
public class BulkLoadJob {
static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);
private static Configuration conf = null;
static {
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", "172.20.237.104,172.20.237.105,172.20.237.106");
HBASE_CONFIG.set("hbase.master.kerberos.principal", "hbase/_HOST@TDH");
HBASE_CONFIG.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@TDH");
HBASE_CONFIG.set("hbase.security.authentication", "kerberos");
HBASE_CONFIG.set("zookeeper.znode.parent", "/hyperbase1");
HBASE_CONFIG.set("hadoop.security.authentication", "kerberos");
conf = HBaseConfiguration.create(HBASE_CONFIG);
}
public static void main(String[] args) throws Exception {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hbase/gz237-104", "/etc/hyperbase1/conf/hyperbase.keytab");
String inputPath = "/yang/data.txt";
String outputPath = "/yang/BulkLoad";
Job job = Job.getInstance(conf, "BulkLoadToHbase");
job.setJarByClass(BulkLoadJob.class);
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//拒绝推测式task的运行
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
//in/out format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
//指定来源
FileInputFormat.addInputPath(job, new Path(inputPath));
//指定输出地
FileOutputFormat.setOutputPath(job, new Path(outputPath));
HTable table = new HTable(conf, "hfiletable");
HFileOutputFormat2.configureIncrementalLoad(job, table);
boolean b = job.waitForCompletion(true);
if (b) {
FsShell shell = new FsShell(conf);
try {
shell.run(new String[]{"-chmod", "-R", "777", outputPath});
} catch (Exception e) {
logger.error("不能改变文件权限 ", e);
throw new IOException(e);
}
//加载到hbase表
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(outputPath), table);
System.out.println("执行成功");
} else {
System.out.println("执行失败");
logger.error("加载失败!");
System.exit(1);
}
}
}
过程中遇到的报错:
解决:protobuf-java-2.5.0.jar因为包冲突,由于我创建project时,结构为父模块和子模块,可能在导包的时候,被其他子模块的包给冲突了。因此,我新建了一个project重新打包到linux运行成功。


