最近一个群友的boss让研究hbase,让hbase的入库速度达到5w+/s,这可愁死了,4台个人电脑组成的集群,多线程入库调了好久,速度也才1w左右,都没有达到理想的那种速度,然后就想到了这种方式,但是网上多是用mapreduce来实现入库,而现在的需求是实时入库,不生成文件了,所以就只能自己用代码实现了,但是网上查了很多资料都没有查到,最后在一个网友的指引下,看了源码,最后找到了生成Hfile的方式,实现了之后,发现单线程入库速度才达到1w4左右,和之前的多线程的全速差不多了,百思不得其解之时,调整了一下代码把列的Byte.toBytes(cols)这个方法调整出来只做一次,速度立马就到3w了,提升非常明显,这是我的电脑上的速度,估计在它的集群上能更快一点吧,下面把代码和大家分享一下。
? ??String tableName = "taglog";
? ? ? ? ? ? byte[] family = Bytes.toBytes("logs");
? ? ? ? ? ? //配置文件设置
? ? ? ? ? ? Configuration conf = HBaseConfiguration.create();
? ? ? ? ? ? conf.set("hbase.master", "192.168.1.133:60000");
? ? ? ? ? ? conf.set("hbase.zookeeper.quorum", "192.168.1.135");
? ? ? ? ? ? //conf.set("zookeeper.znode.parent", "/hbase");
? ? ? ? ? ? conf.set("hbase.metrics.showTableName", "false");
? ? ? ? ? ? //conf.set("io.compression.codecs", "org.apache.hadoop.io.compress.SnappyCodec");
? ? ? ? ? ??
? ? ? ? ? ? String outputdir = "hdfs://hadoop.Master:8020/user/SEA/hfiles/";
? ? ? ? ? ? Path dir = new Path(outputdir);
? ? ? ? ? ? Path familydir = new Path(outputdir, Bytes.toString(family));
? ? ? ? ? ? FileSystem fs = familydir.getFileSystem(conf);
? ? ? ? ? ? BloomType bloomType = BloomType.NONE;
? ? ? ? ? ? final HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
? ? ? ? ? ? int blockSize = 64000;
? ? ? ? ? ? Configuration tempConf = new Configuration(conf);
? ? ? ? ? ? tempConf.set("hbase.metrics.showTableName", "false");
? ? ? ? ? ? tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 1.0f);
? ? ? ? ? ? //实例化HFile的Writer,StoreFile实际上只是HFile的轻量级的封装
? ? ? ? ? ? StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf),
? ? ? ? ? ? ? ? ? ? fs, blockSize)
? ? ? ? ? ? ? ? ? ? .withOutputDir(familydir)
? ? ? ? ? ? ? ? ? ? .withCompression(Compression.Algorithm.NONE)
? ? ? ? ? ? ? ? ? ? .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
? ? ? ? ? ? ? ? ? ? .withDataBlockEncoder(encoder).build();
? ? ? ? ? ? long start = System.currentTimeMillis();
? ? ? ? ? ??
? ? ? ? ? ? DecimalFormat df = new DecimalFormat("0000000");
? ? ? ? ? ??
? ? ? ? ? ??
? ? ? ? ? ??
? ? ? ? ? ? KeyValue kv1 = null;
? ? ? ? ? ? KeyValue kv2 = null;
? ? ? ? ? ? KeyValue kv3 = null;
? ? ? ? ? ? KeyValue kv4 = null;
? ? ? ? ? ? KeyValue kv5 = null;
? ? ? ? ? ? KeyValue kv6 = null;
? ? ? ? ? ? KeyValue kv7 = null;
? ? ? ? ? ? KeyValue kv8 = null;
? ? ? ? ? ??
? ? ? ? ? ? //这个是耗时操作,只进行一次
? ? ? ? ? ? byte[] cn = Bytes.toBytes("cn");
? ? ? ? ? ? byte[] dt = Bytes.toBytes("dt");
? ? ? ? ? ? byte[] ic = Bytes.toBytes("ic");
? ? ? ? ? ? byte[] ifs = Bytes.toBytes("if");
? ? ? ? ? ? byte[] ip = Bytes.toBytes("ip");
? ? ? ? ? ? byte[] le = Bytes.toBytes("le");
? ? ? ? ? ? byte[] mn = Bytes.toBytes("mn");
? ? ? ? ? ? byte[] pi = Bytes.toBytes("pi");
? ? ? ? ? ??
? ? ? ? ? ? int maxLength = 3000000;
? ? ? ? ? ? for(int i=0;i<maxLength;i++){
? ? ? ? ? ? ? ? String currentTime = ""+System.currentTimeMillis() + df.format(i);
? ? ? ? ? ? ? ? long current = System.currentTimeMillis();
? ? ? ? ? ? ? ? ?//rowkey和列都要按照字典序的方式顺序写入,否则会报错的
? ? ? ? ? ? ? ? ?kv1 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, cn,current,KeyValue.Type.Put,Bytes.toBytes("3"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv2 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, dt,current,KeyValue.Type.Put,Bytes.toBytes("6"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv3 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, ic,current,KeyValue.Type.Put,Bytes.toBytes("8"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv4 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, ifs,current,KeyValue.Type.Put,Bytes.toBytes("7"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv5 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, ip,current,KeyValue.Type.Put,Bytes.toBytes("4"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv6 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, le,current,KeyValue.Type.Put,Bytes.toBytes("2"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv7 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family, mn,current,KeyValue.Type.Put,Bytes.toBytes("5"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ?kv8 = new KeyValue(Bytes.toBytes(currentTime),?
? ? ? ? ? ? ? ? ? ? ? ? ?family,pi,current,KeyValue.Type.Put,Bytes.toBytes("1"));
? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? writer.append(kv1);
? ? ? ? ? ? ? ? writer.append(kv2);
? ? ? ? ? ? ? ? writer.append(kv3);
? ? ? ? ? ? ? ? writer.append(kv4);
? ? ? ? ? ? ? ? writer.append(kv5);
? ? ? ? ? ? ? ? writer.append(kv6);
? ? ? ? ? ? ? ? writer.append(kv7);
? ? ? ? ? ? ? ? writer.append(kv8);
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? ? ??
? ? ? ? ? ? writer.close();
? ? ? ? ? ??
? ? ? ? ? ? //把生成的HFile导入到hbase当中
? ? ? ? ? ? HTable table = new HTable(conf,tableName);
? ? ? ? ? ? LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
? ? ? ? ? ? loader.doBulkLoad(dir, table);
最后再附上查看hfile的方式,查询正确的hfile和自己生成的hfile,方便查找问题。
hbase org.apache.hadoop.hbase.io.hfile.HFile -p -f hdfs://hadoop.Master:8020/user/SEA/hfiles/logs/51aa97b2a25446f89d5c870af92c9fc1
原文:http://stark-summer.iteye.com/blog/2191065