本文主要介绍hbase写入流程。hbase的写入主要有put、delete、append 3种操作。
hbase是一个分布式文件系统,底层依赖的是hdfs。delete时并不是和mysql一样立马进行物理删除,而是追加一个写入操作,操作类型为DELETE,和PUT的流程几乎完全相同。
先了解hbase写入的一个大致流程
Client API -> RPC -> server IPC -> write WAL -> write memStore -> flush(ifNeed)
- Table类定义了 Hbase Client 拥有的API,HTable是Table的实现类,看代码可从Htable看起。Client生成一个PUT对象,然后调用HTable的put方法
public void put(final Put put) throws IOException {
//请求RPC前进行验证,验证是否有需要提交的列,及提交内容是否超过设定大小(默认大小为10485760(10M))。 验证失败抛出异常。 validatePut(put); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Void rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); //使用probubuf协议序列化,并提交RPC请求 doMutate(request); return null; } }; rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, this.operationTimeoutMs); }
客户端在提交RPC请求之前会进行一次校验,校验内容为1).是否有列, put.isEmpty()。 2). put对象大小是否超过设定值(默认最大值为10485760(10M),客户端配置参数为MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"),校验不通过时抛出异常 throw new IllegalArgumentException("No columns to insert" ) 或者 throw new IllegalArgumentException("KeyValue size too large")
// validate for well-formedness public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
//校验内容1).是否有提交列 if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); }
//校验内容2).是否超过客户端默认配置10485760,超过抛出异常 if (maxKeyValueSize > 0) { for (List<Cell> list : put.getFamilyCellMap().values()) { for (Cell cell : list) { if (KeyValueUtil.length(cell) > maxKeyValueSize) { throw new IllegalArgumentException("KeyValue size too large"); } } } } }
客户端的RPC请求,是由 ClientServiceCallable.doMutate 提交,RPC通信使用的是谷歌的protobuf协议。服务器端IPC实现类为RSRpcServices类,由mutate方法实现。
在mutate方法中,会将protobuf流反序列化为PUT、APPEND、INCREMENT、DELETE对象。在进行PUT对象处理时会检查相关表是否有协处理器,如果没有即调用HRegion的PUT方法进行处理。 RSRpcServices.mutate部分实现代码如下:
case PUT:
//反序列化成PUT对象 Put put = ProtobufUtil.toPut(mutation, cellScanner);
//服务器端也有校验提交内容大小的限制。默认值同客户端,即10485760(10M) checkCellSizeLimit(region, put); // Throws an exception when violated spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put);
//检查是否有协处理器 if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); byte[] family = condition.getFamily().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray(); CompareOperator compareOp = CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, compareOp, comparator, put); } if (processed == null) { boolean result = region.checkAndMutate(row, family, qualifier, compareOp, comparator, timeRange, put); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndPut(row, family, qualifier, compareOp, comparator, put, result); } processed = result; } } else {
//如果没有,提交给HRegion.put()方法处理 region.put(put); processed = Boolean.TRUE; } break;
HRegion在进行put操作前会检查一次region中的memStorm是否超过上限(checkResources()),如果超过了会进行一次flush
void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. //如果操作的是Meta表,则不处理 if (this.getRegionInfo().isMetaRegion()) return; //当Region的MemSize大于blockingMemStoreSize时,进行一次flush(requestFlush0(FlushLifeCycleTracker.DUMMY);),本次flush是阻塞的,其它写入请求先暂停 MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); //region size = onHead size + offHead size (不是dataSize) // blockingMemStoreSize =flushSize(默认值128M) * mult (默认值4) if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); // Don‘t print current limit because it will vary too much. The message is used as a key // over in RetriesExhaustedWithDetailsException processing. throw new RegionTooBusyException("Over memstore limit=" + org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) + ", regionName=" + (this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) + ", server=" + (this.getRegionServerServices() == null? "unknown": this.getRegionServerServices().getServerName())); } } // blockingMemStoreSize 值初始化 void setHTableSpecificConf() { //flushSize可以在建表时在htableDescriptor制定 if (this.htableDescriptor == null) return; long flushSize = this.htableDescriptor.getMemStoreFlushSize(); //未指定时读取配配hbase.hregion.memstore.flush.size. // 默认值DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L 即128M if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; //hbase.hregion.memstore.block.multiplier // 默认值 DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER = 4 long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); //如果超过blockingMemStoreSize则进行阻塞处理 this.blockingMemStoreSize = this.memstoreFlushSize * mult; }
检查完region 的memStore 之后,即进入写入流程。忽略掉其它一些检查,进入doMiniBatchMutate方法(核心操作)
步骤1: 对BatchOperation对象上锁,返回的是一个表示正在处理中的对象MiniBatchOperationInProgress
步骤2:更新所有操作对象的时间戳,确保是最新的。
步骤3: 初始化或构造 WAL edit对象
步骤4:将WALEdits对象提交并持久化(即写WAL)
步骤5:写memStore
步骤6:完成写入操作
/** * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} * In here we also handle replay of edits on region recover. * 在这里还处理 region恢复时重放写入操作(WAL操作的回放) * @return Change in size brought about by applying <code>batchOp</code> * 返回处理的数量 */ private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { boolean success = false; WALEdit walEdit = null; WriteEntry writeEntry = null; boolean locked = false; // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive) MiniBatchOperationInProgress<Mutation> miniBatchOp = null; /** Keep track of the locks we hold so we can release them in finally clause */ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size()); try { // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with // locked rows // 对BatchOperation对象上锁,返回的是一个表示正在处理中的对象MiniBatchOperationInProgress miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks); // We‘ve now grabbed as many mutations off the list as we can // Ensure we acquire at least one. if (miniBatchOp.getReadyToWriteCount() <= 0) { // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? return; } lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp // 步骤2:更新所有操作对象的时间戳,确保是最新的。 long now = EnvironmentEdgeManager.currentTime(); batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit // 步骤3:初始化或构造 WAL edit对象 List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp); // STEP 4. Append the WALEdits to WAL and sync. //步骤4: 将WALEdits对象提交并持久化(即写WAL) for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) { Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next(); walEdit = nonceKeyWALEditPair.getSecond(); NonceKey nonceKey = nonceKeyWALEditPair.getFirst(); if (walEdit != null && !walEdit.isEmpty()) { writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now, nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum()); } // Complete mvcc for all but last writeEntry (for replay case) if (it.hasNext() && writeEntry != null) { mvcc.complete(writeEntry); writeEntry = null; } } // STEP 5. Write back to memStore // 步骤5: 写memStore // NOTE: writeEntry can be null here writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry); // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and // complete mvcc for last writeEntry batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry); writeEntry = null; success = true; } finally { // Call complete rather than completeAndWait because we probably had error if walKey != null if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } releaseRowLocks(acquiredRowLocks); final int finalLastIndexExclusive = miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size(); final boolean finalSuccess = success; batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> { batchOp.retCodeDetails[i] = finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE; return true; }); batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess); batchOp.nextIndexToProcess = finalLastIndexExclusive; } }
至此HBASE的put, delete流程即算完毕。
原文:https://www.cnblogs.com/lancelot-zj/p/10505320.html