首页 > 其他 > 详细

hbase2.0 源码阅读:Put/Delete流程

时间:2019-03-10 15:19:00      阅读:563      评论:0      收藏:0      [点我收藏+]

本文主要介绍hbase写入流程。hbase的写入主要有put、delete、append 3种操作。

hbase是一个分布式文件系统,底层依赖的是hdfs。delete时并不是和mysql一样立马进行物理删除,而是追加一个写入操作,操作类型为DELETE,和PUT的流程几乎完全相同。

先了解hbase写入的一个大致流程

Client API -> RPC -> server IPC -> write WAL -> write memStore -> flush(ifNeed)

- Table类定义了 Hbase Client 拥有的API,HTable是Table的实现类,看代码可从Htable看起。Client生成一个PUT对象,然后调用HTable的put方法

public void put(final Put put) throws IOException {

//请求RPC前进行验证,验证是否有需要提交的列,及提交内容是否超过设定大小(默认大小为10485760(10M))。 验证失败抛出异常。 validatePut(put); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Void rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); //使用probubuf协议序列化,并提交RPC请求 doMutate(request); return null; } }; rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, this.operationTimeoutMs); }  

  客户端在提交RPC请求之前会进行一次校验,校验内容为1).是否有列, put.isEmpty()。 2). put对象大小是否超过设定值(默认最大值为10485760(10M),客户端配置参数为MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"),校验不通过时抛出异常 throw new IllegalArgumentException("No columns to insert" )  或者 throw new IllegalArgumentException("KeyValue size too large")

  // validate for well-formedness
  public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {

//校验内容1).是否有提交列 if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); }
//校验内容2).是否超过客户端默认配置10485760,超过抛出异常 if (maxKeyValueSize > 0) { for (List<Cell> list : put.getFamilyCellMap().values()) { for (Cell cell : list) { if (KeyValueUtil.length(cell) > maxKeyValueSize) { throw new IllegalArgumentException("KeyValue size too large"); } } } } }

  客户端的RPC请求,是由 ClientServiceCallable.doMutate 提交,RPC通信使用的是谷歌的protobuf协议。服务器端IPC实现类为RSRpcServices类,由mutate方法实现。

在mutate方法中,会将protobuf流反序列化为PUT、APPEND、INCREMENT、DELETE对象。在进行PUT对象处理时会检查相关表是否有协处理器,如果没有即调用HRegion的PUT方法进行处理。 RSRpcServices.mutate部分实现代码如下:

case PUT:

//反序列化成PUT对象 Put put = ProtobufUtil.toPut(mutation, cellScanner);
//服务器端也有校验提交内容大小的限制。默认值同客户端,即10485760(10M) checkCellSizeLimit(region, put); // Throws an exception when violated spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put);
//检查是否有协处理器 if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); byte[] family = condition.getFamily().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray(); CompareOperator compareOp = CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, compareOp, comparator, put); } if (processed == null) { boolean result = region.checkAndMutate(row, family, qualifier, compareOp, comparator, timeRange, put); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndPut(row, family, qualifier, compareOp, comparator, put, result); } processed = result; } } else {
//如果没有,提交给HRegion.put()方法处理 region.put(put); processed = Boolean.TRUE; } break;

  HRegion在进行put操作前会检查一次region中的memStorm是否超过上限(checkResources()),如果超过了会进行一次flush

void checkResources() throws RegionTooBusyException {
    // If catalog region, do not impose resource constraints or block updates.
    //如果操作的是Meta表,则不处理
    if (this.getRegionInfo().isMetaRegion()) return;
    //当Region的MemSize大于blockingMemStoreSize时,进行一次flush(requestFlush0(FlushLifeCycleTracker.DUMMY);),本次flush是阻塞的,其它写入请求先暂停
    MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
    //region size = onHead size + offHead size (不是dataSize)
    // blockingMemStoreSize =flushSize(默认值128M) * mult (默认值4)
if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
    blockedRequestsCount.increment();
    requestFlush();
    // Don‘t print current limit because it will vary too much. The message is used as a key
    // over in RetriesExhaustedWithDetailsException processing.
    throw new RegionTooBusyException("Over memstore limit=" +
      org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) +
      ", regionName=" +
        (this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) +
        ", server=" + (this.getRegionServerServices() == null? "unknown":
            this.getRegionServerServices().getServerName()));
  }
}

// blockingMemStoreSize 值初始化
void setHTableSpecificConf() {
  //flushSize可以在建表时在htableDescriptor制定
  if (this.htableDescriptor == null) return;
  long flushSize = this.htableDescriptor.getMemStoreFlushSize();
  //未指定时读取配配hbase.hregion.memstore.flush.size.
  // 默认值DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L 即128M
  if (flushSize <= 0) {
    flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
  }
  this.memstoreFlushSize = flushSize;
  //hbase.hregion.memstore.block.multiplier
  // 默认值 DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER = 4
  long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
      HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
  //如果超过blockingMemStoreSize则进行阻塞处理
  this.blockingMemStoreSize = this.memstoreFlushSize * mult;
}

  检查完region 的memStore 之后,即进入写入流程。忽略掉其它一些检查,进入doMiniBatchMutate方法(核心操作)

步骤1:  对BatchOperation对象上锁,返回的是一个表示正在处理中的对象MiniBatchOperationInProgress
步骤2:更新所有操作对象的时间戳,确保是最新的。
步骤3:  初始化或构造 WAL edit对象
步骤4:将WALEdits对象提交并持久化(即写WAL)
步骤5:写memStore
步骤6:完成写入操作



 /**
   * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
   * In here we also handle replay of edits on region recover.
   * 在这里还处理 region恢复时重放写入操作(WAL操作的回放)
   * @return Change in size brought about by applying <code>batchOp</code>
   * 返回处理的数量
   */
  private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
    boolean success = false;
    WALEdit walEdit = null;
    WriteEntry writeEntry = null;
    boolean locked = false;
    // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive)
    MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
    /** Keep track of the locks we hold so we can release them in finally clause */
    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
    try {
      // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
      // locked rows
      // 对BatchOperation对象上锁,返回的是一个表示正在处理中的对象MiniBatchOperationInProgress
      miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);

      // We‘ve now grabbed as many mutations off the list as we can
      // Ensure we acquire at least one.
      if (miniBatchOp.getReadyToWriteCount() <= 0) {
        // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
        return;
      }

      lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
      locked = true;

      // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
      // We should record the timestamp only after we have acquired the rowLock,
      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
      // 步骤2:更新所有操作对象的时间戳,确保是最新的。
      long now = EnvironmentEdgeManager.currentTime();
      batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);

      // STEP 3. Build WAL edit
      // 步骤3:初始化或构造 WAL edit对象
      List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);

      // STEP 4. Append the WALEdits to WAL and sync.
      //步骤4: 将WALEdits对象提交并持久化(即写WAL)
      for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
        Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
        walEdit = nonceKeyWALEditPair.getSecond();
        NonceKey nonceKey = nonceKeyWALEditPair.getFirst();

        if (walEdit != null && !walEdit.isEmpty()) {
          writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
              nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
        }

        // Complete mvcc for all but last writeEntry (for replay case)
        if (it.hasNext() && writeEntry != null) {
          mvcc.complete(writeEntry);
          writeEntry = null;
        }
      }

      // STEP 5. Write back to memStore
      // 步骤5: 写memStore
      // NOTE: writeEntry can be null here
      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);

      // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
      // complete mvcc for last writeEntry
      batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
      writeEntry = null;
      success = true;
    } finally {
      // Call complete rather than completeAndWait because we probably had error if walKey != null
      if (writeEntry != null) mvcc.complete(writeEntry);

      if (locked) {
        this.updatesLock.readLock().unlock();
      }
      releaseRowLocks(acquiredRowLocks);

      final int finalLastIndexExclusive =
          miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
      final boolean finalSuccess = success;
      batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
        batchOp.retCodeDetails[i] =
            finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
        return true;
      });

      batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);

      batchOp.nextIndexToProcess = finalLastIndexExclusive;
    }
  }

 


至此HBASE的put, delete流程即算完毕。

技术分享图片

 

hbase2.0 源码阅读:Put/Delete流程

原文:https://www.cnblogs.com/lancelot-zj/p/10505320.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!