Region Split请求是在Region MemStore Flush之后被触发的:
boolean shouldCompact = region.flushcache(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { server.compactSplitThread.requestCompaction(region, getName()); } server.getMetrics().addFlush(region.getRecentFlushInfo());
Region Flush操作完成之后,会进行checkSplit的判断,如果返回值不为null(返回值为该Region的SplitPoint),表示该Region达到了进行Split的条件,发起相应的Split请求。
checkSplit方法定义如下:
/** * Return the splitpoint. null indicates the region isn‘t splittable. If the * splitpoint isn‘t explicitly specified, it will go over the stores to find * the best splitpoint. Currently the criteria of best splitpoint is based * on the size of the store. */ public byte[] checkSplit() { // Can‘t split ROOT/META if (this.regionInfo.isMetaTable()) { if (shouldForceSplit()) { LOG.warn("Cannot split root/meta regions in HBase 0.20 and above"); } return null; } if (!splitPolicy.shouldSplit()) { return null; } byte[] ret = splitPolicy.getSplitPoint(); if (ret != null) { try { checkRow(ret, "calculated split"); } catch (IOException e) { LOG.error("Ignoring invalid split", e); return null; } } return ret; }
由上述代码可以看出,如果当前Region属于目录信息表(ROOT/META),则是不允许进行Split操作的,否则根据当前Region的RegionSplitPolicy实例判断是否需要进行Split,流程包含两步:
(1)该Region是否允许进行Split;
(2)该Region在允许进行Split的条件下,是否可以计算出相应的SplitPoint。
RegionSplitPolicy shouldSplit
如果没有在定义表结构时进行特殊的指定,RegionSplitPolicy默认为org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy的实例,配置项为hbase.regionserver.region.split.policy。
方法代码如下:
@Override protected boolean shouldSplit() { if (region.shouldForceSplit()) { return true; } boolean foundABigStore = false; // Get count of regions that have the same common table as this.region int tableRegionsCount = getCountOfCommonTableRegions(); // Get size to check long sizeToCheck = getSizeToCheck(tableRegionsCount); for (Store store : region.getStores().values()) { // If any of the stores is unable to split (eg they contain // reference files) // then don‘t split if ((!store.canSplit())) { return false; } // Mark if any store is big enough long size = store.getSize(); if (size > sizeToCheck) { LOG.debug("ShouldSplit because " + store.getColumnFamilyName() + " size=" + size + ", sizeToCheck=" + sizeToCheck + ", regionsWithCommonTable=" + tableRegionsCount); foundABigStore = true; break; } } return foundABigStore; }
执行流程:
(1)如果当前Region被请求执行ForceSplit,则直接返回true;
(2)计算当前Region中的各个Store大小的上限值;
(3)循环判断当前Region中的某一Store大小是否超过上限值,如果存在这样的Store,则提前结束循环,返回true即可。
其中,进行大小判断的Region Store必须是可Split的,即该Store中不包含Reference类型的文件,如果某一Store中出现了Reference类型的文件,则表示该Region已经被Split过,不能再进行Split,此时,直接返回false即可。
重点讲述一下Region中各个Store大小的上限值的计算方法:
(1)假设当前Region所属的表为t,计算该Region所处于的RegionServer上包含表t的Online Region数目,并将结果保存至变量tableRegionsCount中;
// Get count of regions that have the same common table as this.region int tableRegionsCount = getCountOfCommonTableRegions();
getCountOfCommonTableRegions方法代码如下:
/** * @return Count of regions on this server that share the table this.region * belongs to */ private int getCountOfCommonTableRegions() { RegionServerServices rss = this.region.getRegionServerServices(); // Can be null in tests if (rss == null) { return 0; } byte[] tablename = this.region.getTableDesc().getName(); int tableRegionsCount = 0; try { List<HRegion> hri = rss.getOnlineRegions(tablename); tableRegionsCount = hri == null || hri.isEmpty() ? 0 : hri.size(); } catch (IOException e) { LOG.debug("Failed getOnlineRegions " + Bytes.toString(tablename), e); } return tableRegionsCount; }
首先获取该Region所处于的RegionServer实例:
RegionServerServices rss = this.region.getRegionServerServices();
然后获取该Region所对应的表的名称:
byte[] tablename = this.region.getTableDesc().getName();
最后获取表tablename在rss上的Online Region的数目:
List<HRegion> hri = rss.getOnlineRegions(tablename);
(2)根据tableRegionsCount计算上限值:
// Get size to check long sizeToCheck = getSizeToCheck(tableRegionsCount);
getSizeToCheck方法代码如下:
/** * @return Region max size or * <code>count of regions squared * flushsize, which ever is * smaller; guard against there being zero regions on this server. */ long getSizeToCheck(final int tableRegionsCount) { return tableRegionsCount == 0 ? getDesiredMaxFileSize() : Math.min( getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * tableRegionsCount)); }
计算过程根据tableRegionsCount的值分为两种情况:
(1)tableRegionsCount值为0时(可能发生么?),直接通过方法getDesiredMaxFileSize返回结果即可(getDesiredMaxFileSize的返回值可以在创建表时指定,如果创建表时没有特殊指定,则由配置项hbase.hregion.max.filesize决定,默认值为10737418240即10G);
(2)tableRegionsCount值不为0时,结果为getDesiredMaxFileSize()与this.flushSize * (tableRegionsCount * tableRegionsCount)两者之间的最小值,其中flushSize在创建表时指定,如果创建表时没有特殊指定,则由配置项hbase.hregion.memstore.flush.size决定,默认值为134217728即128M。
RegionSplitPolicy getSplitPoint
进行到这一步,表示该Region是允许进行Split的,下一步应该计算该Region的SplitPoint。
方法代码如下:
/** * @return the key at which the region should be split, or null if it cannot * be split. This will only be called if shouldSplit previously * returned true. */ protected byte[] getSplitPoint() { byte[] explicitSplitPoint = this.region.getExplicitSplitPoint(); if (explicitSplitPoint != null) { return explicitSplitPoint; } Map<byte[], Store> stores = region.getStores(); byte[] splitPointFromLargestStore = null; long largestStoreSize = 0; for (Store s : stores.values()) { byte[] splitPoint = s.getSplitPoint(); long storeSize = s.getSize(); if (splitPoint != null && largestStoreSize < storeSize) { splitPointFromLargestStore = splitPoint; largestStoreSize = storeSize; } } return splitPointFromLargestStore; }
执行流程如下:
(1)如果请求ForceSplit时显示指定了SplitPoint,则直接将该值返回即可;
(2)循环处理该Region的Store,分别获取该Store的大小和SplitPoint,最后Region的SplitPoint为最大的那个Store的SplitPoint。
接下来的问题是如何计算Store的SplitPoint。
Store getSplitPoint
/** * Determines if Store should be split * * @return byte[] if store should be split, null otherwise. */ public byte[] getSplitPoint() { this.lock.readLock().lock(); try { // sanity checks if (this.storefiles.isEmpty()) { return null; } // Should already be enforced by the split policy! assert !this.region.getRegionInfo().isMetaRegion(); // Not splitable if we find a reference store file present in the // store. long maxSize = 0L; StoreFile largestSf = null; for (StoreFile sf : storefiles) { if (sf.isReference()) { // Should already be enforced since we return false in this // case assert false : "getSplitPoint() called on a region that can‘t split!"; return null; } StoreFile.Reader r = sf.getReader(); if (r == null) { LOG.warn("Storefile " + sf + " Reader is null"); continue; } long size = r.length(); if (size > maxSize) { // This is the largest one so far maxSize = size; largestSf = sf; } } StoreFile.Reader r = largestSf.getReader(); if (r == null) { LOG.warn("Storefile " + largestSf + " Reader is null"); return null; } // Get first, last, and mid keys. Midkey is the key that starts // block // in middle of hfile. Has column and timestamp. Need to return just // the row we want to split on as midkey. byte[] midkey = r.midkey(); if (midkey != null) { KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length); byte[] fk = r.getFirstKey(); KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length); byte[] lk = r.getLastKey(); KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length); // if the midkey is the same as the first or last keys, then we // cannot // (ever) split this region. if (this.comparator.compareRows(mk, firstKey) == 0 || this.comparator.compareRows(mk, lastKey) == 0) { if (LOG.isDebugEnabled()) { LOG.debug("cannot split because midkey is the same as first or " + "last row"); } return null; } return mk.getRow(); } } catch (IOException e) { LOG.warn("Failed getting store size for " + this, e); } finally { this.lock.readLock().unlock(); } return null; }
执行流程
(1)选择Store StoreFiles中的最大的那个StoreFile largestSf;
long maxSize = 0L; StoreFile largestSf = null; for (StoreFile sf : storefiles) { if (sf.isReference()) { // Should already be enforced since we return false in this // case assert false : "getSplitPoint() called on a region that can‘t split!"; return null; } StoreFile.Reader r = sf.getReader(); if (r == null) { LOG.warn("Storefile " + sf + " Reader is null"); continue; } long size = r.length(); if (size > maxSize) { // This is the largest one so far maxSize = size; largestSf = sf; } }
(2)获取largestSf的MidKey、FirstKey、LastKey,如果MidKey与FirstKey相等或者MidKey与LastKey相等,则返回null(为什么?);否则返回MidKey。
// Get first, last, and mid keys. Midkey is the key that starts // block // in middle of hfile. Has column and timestamp. Need to return just // the row we want to split on as midkey. byte[] midkey = r.midkey(); if (midkey != null) { KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length); byte[] fk = r.getFirstKey(); KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length); byte[] lk = r.getLastKey(); KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length); // if the midkey is the same as the first or last keys, then we // cannot // (ever) split this region. if (this.comparator.compareRows(mk, firstKey) == 0 || this.comparator.compareRows(mk, lastKey) == 0) { if (LOG.isDebugEnabled()) { LOG.debug("cannot split because midkey is the same as first or " + "last row"); } return null; } return mk.getRow(); }
StoreFile是由多个Block组成的(这里的Block不同于HDFS的Block),每个Block的第一个RowKey会被存储到StoreFile中的特殊位置中,因此,这里的MidKey、FirstKey、LastKey指的就是StoreFile中MidBlock、FirstBlock、LastBlock各自的第一个RowKey。
Region Split是以Row作为最小切分单位的,即同一行的数据会完整的出现在某一Region中,如果MidKey与FirstKey相等或者MidKey与LastKey相等,则表示如果进行切分则会出现某Region中的RowKey是完全一样的,即该Region中仅包含一个行的数据,这种情况出现中HBase中是不合理的,因此不允许MidKey与FirstKey相等或者MidKey与LastKey相等时进行Split。
综上所述,如果某一Region满足Split的条件且可以计算出SplitPoint,则可以发起Split请求:
this.server.compactSplitThread.requestSplit(region);
原文:http://www.cnblogs.com/yurunmiao/p/3530728.html