主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink.
public interface Source extends LifecycleAware, NamedComponent { /** * Specifies which channel processor will handle this source‘s events. * * @param channelProcessor */ public void setChannelProcessor(ChannelProcessor channelProcessor); /** * Returns the channel processor that will handle this source‘s events. */ public ChannelProcessor getChannelProcessor(); }
public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } }
Sink接口,也有与Channel有关的set和get方法,且是直接对应Channel,而不是Source那样对应ChannelProcessor。ChannelProcessor其实是位于Source和Channel之间的东西,相当于一个Event分发器,实现Source的mapping、replicating,optional机制(需要进一步看看代码)。 Sink之所以直接对应Channel,是因为任何一个Sink只从唯一的Channel中消费数据并发送到目标端。
public class ChannelProcessor implements Configurable { private final ChannelSelector selector; private final InterceptorChain interceptorChain; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; this.interceptorChain = new InterceptorChain(); } public void initialize() { interceptorChain.initialize(); } public void close() { interceptorChain.close(); } /** * The Context of the associated Source is passed. * @param context */ @Override public void configure(Context context) { configureInterceptors(context); } public ChannelSelector getSelector() { return selector; } public void processEventBatch(List<Event> events) { 。。。 } public void processEvent(Event event) { 。。。。 }
public interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); }
A channel connects a Source
to a Sink
. The source acts as producer while the sink acts as a consumer of events. The channel itself is the buffer between the two.
A channel exposes a Transaction
interface that can be used by its clients to ensure atomic put and take semantics. This is necessary to guarantee single hop reliability between agents. For instance, a source will successfully produce an event if and only if that event can be committed to the source‘s associated channel. Similarly, a sink will consume an event if and only if its respective endpoint can accept the event. The extent of transaction support varies for different channel implementations ranging from strong to best-effort semantics.
Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.
从以上看出,Channel主要是存储Event,Event的存和取操作由Transaction控制。 后面再重点学习下Transaction
org.apache.flume.Transaction Provides the transaction boundary while accessing a channel A Transaction instance is used to encompass channel access via the following idiom: Channel ch = ... Transaction tx = ch.getTransaction(); try { tx.begin(); ... // ch.put(event) or ch.take() ... tx.commit(); } catch (ChannelException ex) { tx.rollback(); ... } finally { tx.close(); } Depending upon the implementation of the channel, the transaction semantics may be strong, or best-effort only. Transactions must be thread safe. To provide a guarantee of thread safe access to Transactions, see BasicChannelSemantics and BasicTransactionSemantics.
BasicTransactionSemantics确保了事务相关的操作只有按正确的顺序执行才可以。即tx.begin =》 channel.take/put =》 tx.commit =》 tx.close。它只保证了对Channel操作的顺序,由子类实现doBegin, doTake, doPut, doCommit, doRollback, doClose等方法。
private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>();
/** * <p> * Initializes the channel if it is not already, then checks to see * if there is an open transaction for this thread, creating a new * one via <code>createTransaction</code> if not. * @return the current <code>Transaction</code> object for the * calling thread * </p> */ @Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; }
@Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); }
An implementation of basic Transaction
semantics designed to work in concert with BasicChannelSemantics
to simplify creation of robust Channel
implementations. This class ensures that each transaction implementation method is called only while the transaction is in the correct state for that method, and only by the thread that created the transaction. Nested calls to begin()
and close()
are supported as long as they are balanced.
Subclasses need only implement doPut
, doTake
, doCommit
, and doRollback
, and the developer can rest assured that those methods are called only after transaction state preconditions have been properly met. doBegin
and doClose
may also be implemented if there is work to be done at those points.
All InterruptedException exceptions thrown from the implementations of the doXXX
methods are automatically wrapped to become ChannelExceptions, but only after restoring the interrupted status of the thread so that any subsequent blocking method calls will themselves throw InterruptedException rather than blocking. The exception to this rule is doTake
, which simply returns null instead of wrapping and propagating the InterruptedException, though it still first restores the interrupted status of the thread.
主要成员函数有:begin,commit,close, doBegin,doCommit,doClose,doPut,doTake,doRollback,getState,toString,put,take,rollback
其中put take begin close commit rollback 的结构都很相似。主要结构都是确保这些操作时Transaction在正确的对应状态,然后调用doXXX方法。如果当前的线程不拥有当前的事务或者事务的状态不对,就抛出异常。如果doXXX方法抛出InterruptedException就通过Thread.currentThread.interrupt()方法恢复当前线程的interrupted状态,然后将捕获的InterruptedException包装成一个ChannelException抛出:
@Override public void rollback() { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "rollback() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "rollback() called when transaction is %s!", state); state = State.COMPLETED; try { doRollback(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ChannelException(e.toString(), e); } }
protected Event take() { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "take() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "take() called when transaction is %s!", state); try { return doTake(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } }