HLog的作用:
HBase写入数据时会同时写入到WAL和Memstore中,其中Memstore是位于内存中的store,类似于写缓存,当Memstore的大小超过限定的阈值时会触发flush行为,将内存中的数据刷写到磁盘做持久化。其中的wal也称为hlog,作用类似于mysql中的binlog,记录了客户端的每次update动作,只有当wal写入成功之后,本次写事务才会返回。
我们知道内存中的数据是易失的,当regionserver宕机时,HMaster会切割按region切割宕机regionserver上的hlog,并将它分发到region被迁移到的新regionserver上以恢复该在memstore中还未来得及刷盘的数据。相应地如果hlog里面的记录已经完成了flush,则该hlog会被regionServer移动到.oldlog目录下,由HMaster上的定时线程LogCleaner周期性地扫描该目录,删除掉不再使用的hlog。
此外hlog还有一个意义就是用于hbase的replication,hbase的replication是通过将主集群的hlog推送到备集群,然后在备集群上reply来实现的。这个推送过程是异步完成的,因此会存在.oldlog目录下的hlog还未被replication推送完成的情况,此时HMaster会将这些未推送完成的hlog记录在zk上。方便在清除.oldlog目录时跳过有zk指向的hlog文件。
WALFactory类:
先从WALFactory开始分析,HRegionServer中管理着一个WALFactory变量,定义的格式如下:
protected volatile WALFactory walFactory
下面分析一下walFactory在regionserver中的应用姿势,首先在RegionServer中维护着一个与Master之间的心跳逻辑,这段代码在RegionServer的主循环run()里,如下所示:
while(keepLooping()) { RegionServerStartupResponse w = repartForDuty(); if (w == null) { this.sleeper.sleep(); } else { handleReportForDutyResponse(w); break; } }reportForDuty是RegionServer向master上报注册信息,master会回应一个key/value格式的信息给regionserver,以标识regionserver本次register成功。
regionserver收到master的回应消息后,开始调用handleReportForDutyResponse,这个函数的主要逻辑列在下面:
try{ 根据master的返回值处理hostname逻辑; ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); this.walFactory = setupWALAndReplication(); //启动walFactory 初始化metricsRegionServer; startServiceThreads(); //启动各路服务线程; startHeapMemoryManager(); //启动内存管理了; synchronized (online) { online.set(true); online.notifyAll(); } }
setupWALAndReplication创建并返回了WALFactory类,setupWALAndRepliaction中的几个主要步骤摘录如下:
private WALFactory setupWALAndReplication() throws IOException { final Path oldLogDir = new Paht(……..) //获取old WAL日志的路径 final String logName=DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); Path lodger = new Path(rootDir, logName); //如果设置了replication相关,初始化replication manager createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); listeners添加,添加了WALActionsListener; final List<WALActionsListener> listeners = new ArrayList<>(); return new WALFactory(conf, listeners, serverName.toString); }WALFactory的构造函数中除了设置超时时间等之外,还初始化了一个DefaultWALProvider类型的变量,几乎所有与wal文件操作相关的方法都定义在这个接口类中。它里面分别定义了Reader&Writer,用于对WAL文件的读写。WALFactory中提供了两个接口createReader&createWriter,实际也是初始化了DefaultWALProvider中的这两个类。Reader&Writer实现了对hlog文件的读写。
DefaultWALProvider中还包括了一个FSHLog类型的成员变量,FSHLog管理了将WAL持久化的线程模型。下面详细分析FSHLog的实现。
FSHLog与HLog写入模型:首先从hbase的写入路径入手分析,前面分析过客户端的put操作在服务端最终调用的是doMiniBatchMutation。数据在被成功写入到memstore之后,会收集此次写入动作的table name、region info等信息并构造成一个HlogKey结构的对象记为walkey,并将当前写入的数据作为walEdit,然后将walkey和walEdit共同组装成一个entry之后将之append到内存中一个ringbuffer类型的缓冲区中。返回值txid用于标识本次写事务在缓冲区的写入序号。
if (walEdit.size() > 0) { walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, currentNonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, null); walEdit = new WALEdit(isInReplay); walKey = null; }待所有的锁释放之后,再将buffer中的数据刷写到磁盘,最后将版本号向前推进,提交本次写事务,其中刷写磁盘的代码如下:
if(taxied != 0) { syncOrDefer(txid, durabiliby); }
syncOrDefer会根据客户端设置的持久化级别选择是否将日志数据落盘,其中client可以选择的wal持久化等级划分为如下四个等级:
SKIP_WAL:数据只写memstore,不写Hlog,此时写入性能最高,但是数据有丢失风险;
ASYNC_WAL:异步将数据写入HLog文件中;
SYNC_WAL:同步将数据写入日志文件,但是此时数据只是被写入文件系统缓存,并没有真正落盘;
FSYNC_WAL:同步将数据写入日志文件并强制落盘,可以保证数据不丢失,但是性能最差;
客户端可以通过如:put.setDurability(Durability.SYNC_WAL)设置WAL持久化级别,不设置时默认是SYNC_WAL。syncOrDefer通过调用this.wal.sync(txid)将数据落盘,
经过前面的分析,可以看到hlog的写入最终是调用了FSHLog向外暴露的append()&sync()两个方法来实现的,可见FSHLog中实现了hlog的写线程模型。因此想要分析写线程模型的实现,分析的入口就在上面两个方法,在具体分析方法细节前,先看看构造FSHLog时都初始化了哪些变量:
this.appendExecutor = Executors. newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); final int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); this.disruptor = new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.getRingBuffer().next(); this.ringBufferEventHandler = new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler}); this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount); this.disruptor.start();
其中最关键的就是变量disruptor,它是一个Disruptor<RingBufferTruck>类型的成员变量,Distuptor是LMAX开发的一个高性能无锁队列,本质还是个生产者-消费者模型,它采用一个环形数组结构,称为RingBuffer来复用内存,同时Buffer上的读写序列号经过优化可以避免伪共享,多线程并发访问该序列号时通过CPU级别的CAS自旋来获得,以此实现了lock free。这样只要buffer中有事件就会被递交到消费者线程池去处理。
回头看disruptor构造函数中的各参数的含义,首先第一个参数指定了通过Disruptor交换的事件类型,这里定义为RingBufferTruck类型,参数EVENT_FACTORY指代事件工厂,用于Disruptor通过该工厂在RingBuffer中预创建Event实例。参数preallocatedEventCount指定了ringBuffer的大小。ProducerType是数据生产方式,客户端写入数据时,调用this.wal.append()方法实际上就是以生产者的身份将数据写入到RingBuffer中。append关键代码如下:
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, final List<Cell> memstoreCells) throws IOException { FSWALEntry entry = null; long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); //用key和edits构造一个对象entry truck.loadPayload(entry, scope.detach()); //将上面构造的对象包装为RingBufferTruck事件并添加到Ring Buffer } finally { this.disruptor.getRingBuffer().publish(sequence); } return sequence; }消费者线程由appendExecutor指定,这里用到的是newSingleThreadExecutor定义的单线程线程。BlockingWaitStrategy()指定了consumer的等待策略。appendExecutor并不处理具体的event,而是从Ringbuffer中接收之后将它转交给后端的ringBufferEventHandler来处理,因为appendExecutor中不包含事件处理逻辑,所以非常轻量,只需一个线程就可以处理生产端高并发的请求。
真正的事件处理在ringBufferEventHandler中完成,如下面定义,hbase中默认是5个handler:
this.ringBufferEventHandler = new
RingBufferEventHandler(conf.getInt(“hbase.regionserver.hlog.syncer.count”,5), maxHandersCount);
从RingBufferEventHandler起分析event的处理逻辑,每个RingBufferEventHandler中定义了两组主要的线程数组,如下所示:
class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware { private final SyncRunner [] syncRunners; private final SyncFuture [] syncFutures; //其它变量 }
每接收到RingBufferTruck事件,RingBufferEventHandler便会调用onEvent对该事件进行处理,主要的处理逻辑代码列在了下面:
public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) throws Exception { try { if (truck.hasSyncFuturePayload()) { this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; } else if (truck.hasFSWALEntryPayload()) { TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { append(truck.unloadFSWALEntryPayload()); } catch (Exception e) { 。。。。 } } else { return; } int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; try { this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); } catch (Exception e) { cleanupOutstandingSyncsOnException(sequence, e); throw e; } attainSafePoint(sequence); this.syncFuturesCount = 0; } catch (Throwable t) { LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); } }
RingBufferTruck中封装可能封装了两种不同类型的对象,分别是WALEntry或者SyncFuture,消费线程的执行方法onEvent()中对上述对象的处理也不同,如果是WALEntry,则调用append方法,使用writer将输入的WALEntry经protobuf序列化后写入hadoop文件。如果是SyncFuture,则把该对象放入RingBufferEventHandler自身维护的SyncFutures[]数组中。
然后,onEvent从syncRunners[]中取出一个线程,并调用它的offer方法,offer中将该EventHandler中的所有syncFuture添加到SyncRunner自身维护的阻塞队列中,在syncRunner线程的run方法里等到写满一批syncFuture之后,会调用writer.sync()将数据落盘,待数据成功刷到磁盘后,释放syncFuture,并将其中的scope置位。之所以如此设计,是因为对比客户端的屡次append操作,刷盘是相对比较耗时的,以此采用写文件缓存并结合异步刷盘的方式平衡对client端API的友好和客户端写吞吐。run方法简化后的主干代码如下:
public void run() { long currentSequence; while (!isInterrupted()) { int syncCount = 0; SyncFuture takeSyncFuture; try { while (true) { takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); long currentHighestSyncedSequence = highestSyncedSequence.get(); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); continue; } break; } try { writer.sync(); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { 。。。。。 } catch (Exception e) { 。。。。。 } finally { takeSyncFuture.setSpan(scope.detach()); syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); syncCount += releaseSyncFutures(currentSequence, t); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Throwable t) { LOG.warn("UNEXPECTED, continuing", t); } } } }
需要注意的是writer.sync()的预处理,其取出当前已处理的最大sequence与本次待处理syncFuture中的sequence相对比,sequence是按照事务提交的顺序递增赋值的,事务append到缓存的顺序也是与sequence的赠序一致,如果当前sequence小于最大已提交sequence,则表明hlog中已写入相应记录,因此调用releaseSyncFuture()释放syncFuture。
还有一个问题,syncFuture中scope的置位是什么时候来查的呢,答案就是FSHLog向外暴露的this.wal.sync(txid)方法,客户端写操作调用sync后会阻塞等待数据刷盘成功,sync中调用syncFuture的get后阻塞在文件系统的同步操作上,当文件系统将数据落盘完成之后,get方法返回,并将syncFuture中置位的scope返回给客户端。客户端工作线程被唤醒,返回继续写入memstore,完成一次写操作。private Span blockOnSync(final SyncFuture syncFuture) throws IOException { try { syncFuture.get(); return syncFuture.getSpan(); } catch (InterruptedException ie) { 。。。。 } catch (ExecutionException e) { 。。。。 } }
LogRoller:
LogRoller是个定期执行的线程。每个RegionServer中都有一个LogRoller线程,线程执行的周期由hbase.regionserver.logroll.period给出,默认时间是1hr。也就是说每过一个小时会产生一个新的hlog文件,hlog的文件名由regionserver名称+hlog形成时的时间戳构成。
LogRoller的run方法中的主要流程如下面列出:
rollLock.lock(); try { for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) { final WAL wal = entry.getKey(); final byte [][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); walNeedsRoll.put(wal, Boolean.FALSE); if (regionsToFlush != null) { for (byte [] r : regionsToFlush) scheduleFlush(r); } } } finally { try { rollLog.set(false); } finally { rollLock.unlock(); } }在第四行中调用了HLog的rollWriter,rollWriter中会打开一个新的hdfs文件供log写入,并将old的hlog文件关闭。
原文:http://blog.csdn.net/bryce123phy/article/details/56025735