Druid is a fast column-oriented distributed data store. http://druid.io/
当启动Druid的服务,会启动一个java进程,比如run_example_server.sh
会启动io.druid.cli.Main example realtime
.
Guice Inject
Main的buidler类包含了多种服务组, 比如server服务包括了Druid的大部分组件: 协调,历史,Broker,实时,Overlord等.
injectMembers和toInstance注入实例化好的对象
1 2 3 4 5
|
final Injector injector = GuiceInjectors.makeStartupInjector(); final Cli<Runnable> cli = builder.build(); final Runnable command = cli.parse(args); injector.injectMembers(command);
|
Guice是个DI框架.客户端使用对象的流程是: 创建Injector,从Injector中获取实例,调用实例的方法. 客户端解析出来的命令是一个Runnable.
CliRealtime继承了ServerRunnable(又继承了GuiceRunnable). 在makeInjector调用的Initialization初始化会添加很多Module.

CliRealtime的getModules()主要是RealtimeModule. 每个节点都要注册自己职责范围内的Modules.
ReailtimeModule绑定了SegmentPublisher,ChatHandlerProvider,RealtimeManager等.
1 2 3 4 5 6 7 8
|
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); binder.bind(new TypeLiteral<List<FireDepartment>>(){}).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);
|
toInstance也绑定的也是一个实例化对象,而没有接口. 比如NodeTypeConfig并不是一个接口,而是一个正常的类.
https://github.com/google/guice/wiki/Injections#on-demand-injection
Provider的get方法返回值绑定实现类
重要的是RealtimeManager,它的构造函数有三个List,QueryRunnerFactoryConglomerate. 最后一个参数chiefs直接在构造函数中初始化.
前面两个需要通过@Inject注入. 其中①List是泛型类,所以通过上面的TypeLiteral使用FireDepartmentsProvider注入.
1 2 3 4 5 6 7 8 9 10 11 12
|
public class RealtimeManager implements QuerySegmentWalker { private final List<FireDepartment> fireDepartments;
|
对象的注入使用Provider:FireDepartmentsProvider,Provider的get方法返回值会作为List的实现类.
而FireDepartmentsProvider的构造方法需要注入 ObjectMapper 和 RealtimeManagerConfig.其中RealtimeManagerConfig在bind Provider前已经注入.
1 2 3 4
|
public class FireDepartmentsProvider implements Provider<List<FireDepartment>>{ private final List<FireDepartment> fireDepartments = Lists.newArrayList(); public List<FireDepartment> get() { return fireDepartments; } }
|
注解和Provides绑定实现类
ObjectMapper是jackson的内部类,druid的实现类是DefaultObjectMapper. 绑定ObjectMapper也是在初始化的JacksonModule中.
这里的to使用了注解方式, 因为注解的类型是Json, 所以对应的是jsonMapper())创建的DefaultObjectMapper(这里是一个Provides方法,类似于Provider).
1 2 3 4 5 6 7 8 9 10
|
public class JacksonModule implements Module{ public void configure(Binder binder){ binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)); }
@Provides @LazySingleton @Json public ObjectMapper jsonMapper() { return new DefaultObjectMapper(); } }
|
JSON Property
前面JsonConfigProvider绑定的druid.realtime,使用RealtimeManagerConfig,而它只有一个属性@JsonProperty private File specFile
在FireDepartmentsProvider的构造方法中会使用DefaultObjectMapper读取启动进程时druid.realtime.specFile
指定的json文件.
FireDepartment的三个属性字段dataSchema,ioConfig,tuningConfig正好对应了specFile中的json属性. 所以整个流程是:
指定specFile文件,创建DefaultObjectMapper(JacksonModule),DefaultObjectMapper读取JSON文件,构造FireDepartmentsProvider,返回List
1 2 3 4 5 6
|
@JsonCreator public FireDepartment( @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("ioConfig") RealtimeIOConfig ioConfig, @JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig )
|
别的Module注入同样可用
我们并没有看到②QueryRunnerFactoryConglomerate在这里被注入. 怎么办呢? 进入该接口,查看它比较重要的实现类DefaultQueryRunnerFactoryConglomerate.
然后CMD+单机查看它的Usages,只有StorageNodeModule的configure方法,它也是一个Module,被Usage的方法恰好在Initialization初始化的时候.
1 2 3 4 5 6 7 8 9 10 11 12
|
public class StorageNodeModule implements Module{ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null)); binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
binder.bind(QueryRunnerFactoryConglomerate.class).to(DefaultQueryRunnerFactoryConglomerate.class).in(LazySingleton.class); } }
|
对于DefaultQueryRunnerFactoryConglomerate的构造函数也需要注入:Map<Class<? extends Query>, QueryRunnerFactory> factories
同样使用Usage进入QueryRunnerFactory,进入其中一个实现类TimeBoundaryQueryRunnerFactory,在进入其Usage是QueryRunnerFactoryModule
可以看到只要是接口要绑定到某个实现类上, 最后一定是使用Guice的Module来完成的.
MapBinder注入Map
TimeBoundaryQueryRunnerFactory的构造函数也依赖了QueryWatcher,正好也在QueryRunnerFactoryModule一并解决了:
其中mappings定义了Druid支持的各种查询类,对应的查询工厂类. MapBinder是Guice中一种支持Map对象的注入(也用到了TypeLiteral).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public class QueryRunnerFactoryModule extends QueryToolChestModule { private static final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> mappings = ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder() .put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class) .put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class) .....build();
public void configure(Binder binder) { super.configure(binder); binder.bind(QueryWatcher.class).to(QueryManager.class).in(LazySingleton.class); binder.bind(QueryManager.class).in(LazySingleton.class);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(binder); for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : mappings.entrySet()) { queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue());
|
QueryRunnerFactory具体实现类中是查询的具体实现,这里有XXXQuery,XXXQueryRunner,XXXQueryToolChest,XXXResultValue等.
以run_example_client.sh
为例,它的查询类型是timeBoundary,对应TimeBoundaryQuery.
RealtimeManager
RealtimeManager构造函数需要的List和QueryRunnerFactoryConglomerate都注入之后,在start方法就可以开工了.
fireDepartments的每个FireDepartment会被构造成FireChief,FireDepartment的DataSchema的DataSource都对应了一个FireChief.
FireChief包括FireDepartment(数据源),Firehose(怎么读取,迭代器,Source),Plumber(Sink).
FireChief线程会initPlumber初始化Plumber, 由Plumber启动作业, initFirehose初始化Firehose连接数据源,最后runFirehose读取数据.
1 2 3 4 5 6 7
|
public void run() { plumber = initPlumber();
|

Firehose
Firehose消防带连接的是水源,当数据不断注入数据源(比如Kafka),则从消防水管会源源不断喷射出水流,喷射出来的就是InputRow.Firehose类似一个迭代器.
1 2 3 4 5 6 7 8 9 10
|
private void runFirehose(Firehose firehose) { final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); lateEvent = plumber.add(inputRow, committerSupplier) == -1; if (indexLimitExceeded || lateEvent) { plumber.persist(committerSupplier.get()); } } }
|
Firehose通过spec文件的ioConfig的firehose属性①,获取到FirehoseFactory后,根据dataSchema的parser②得到firehoseParser,从而创建Firehose.
为什么Firehose需要dataSchema,因为输出的数据依赖于输入数据的格式,parser用来如何解析输入源数据.parseSpec会指定输入数据的格式,时间撮和维度字段.
1 2 3
|
public Firehose connect() throws IOException { return ioConfig.getFirehoseFactory().connect(dataSchema.getParser()); }
|
以Kafka数据源为例,①firehose的type得到KafkaEightFirehoseFactory. 有三个属性:consumerProps,feed和FirehoseFactory中的type.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
"ioConfig" : { "type" : "realtime", "firehose": { //① "type": "kafka-0.8", //对应KafkaEightFirehoseFactory "consumerProps": { "zookeeper.connect": "localhost:2181", ... }, "feed": "wikipedia" }, "plumber": { "type": "realtime" } },
|
DataSchema
DataSchema有四个json属性,它的构造函数参数ObjectMapper是依赖注入进来的. dataSchema.getParser()获得InputRowParser.
parser里面又配置了多个属性,所以在读取spec文件的时候,会将parser的JSON信息转换为Map.
1 2 3 4 5 6 7 8
|
@JsonCreator public DataSchema( @JsonProperty("dataSource") String dataSource, @JsonProperty("parser") Map<String, Object> parser, @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JacksonInject ObjectMapper jsonMapper )
|
下面是wikipedia的DataSchema spec文件.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
"dataSchema" : { "dataSource" : "wikipedia", "parser" : { //② "type" : "string", //对应StringInputRowParser "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], "dimensionExclusions" : [], "spatialDimensions" : [] } } }, "metricsSpec" : [{ "type" : "count", "name" : "count" }], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "DAY", "queryGranularity" : "NONE" } }
|
如何从JSON转换而来的Map得到InputRowParser,因为格式是固定的,所以在获取到parser后,分别获取timestampSpec和dimensionsSpec. spec是说明书的意思,按照说明书吃药,没错
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public InputRowParser getParser(){ final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class); final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec(); final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec(); return inputRowParser.withParseSpec(
|
KafkaEightFirehoseFactory
KafkaEightFirehoseFactory的connect方法会返回匿名的Firehose对象,它的nextRow方法会根据parser解析kafka的输入数据.
读取Kafka数据使用配置的consumerProps和feed,即可确定要连接的zk和topic. 然后创建一个消费者,数据保存在ConsumerIterator中.
1 2 3 4 5
|
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(feed,1)); final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed); final KafkaStream<byte[], byte[]> stream = streamList.get(0); final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
|
stream就是kafka的消息流. 通过迭代消息流中的message, 使用InputRowParser解析数据, 返回的就是InputRow.
再接上RealtimeManager的runFirehose会调用Firehose的nextRow读取数据, 整个流程就完成了: DataSchema定义-InputRowParser解析-InputRow.
1 2 3 4 5 6 7 8 9
|
return new Firehose() { public boolean hasMore() { return iter.hasNext(); } public InputRow nextRow(){ final byte[] message = iter.next().message(); return theParser.parse(ByteBuffer.wrap(message)); } }
|

RealtimePlumber
从Firehose读取的每一行InputRow都会添加到Plumber中.每一行数据都有一个时间撮timestamp.truncatedTime是使用segmentGranularity对时间撮进行截断.
由于每条记录最终都要存在于一个Segment中,而Segment是以Interval指定的时间间隔存储.比如间隔为1h的Segment:20151011-100000~20151011-110000.
sinks保存的是截断的时间撮对应Sink.Sink保存的是这段时间内的所有事件.获取到Sink后往Sink中添加这一行记录. Sink底层使用了IncrementalIndex增量索引.
如果Sink不能再添加新的一行(比如Segment大小达到阈值)或者与达到刷新时间的间隔(IntermediatePersistPeriod,默认10分钟),就会将Sink中的数据进行持久化.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException { final Sink sink = getSink(row.getTimestampFromEpoch()); final int numRows = sink.add(row); if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { persist(committerSupplier.get()); } return numRows; } private Sink getSink(long timestamp) { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis(); Sink retVal = sinks.get(truncatedTime); if (retVal == null) { final Interval sinkInterval = new Interval(new DateTime(truncatedTime),segmentGranularity.increment(new DateTime(truncatedTime))); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); segmentAnnouncer.announceSegment(retVal.getSegment()); sinks.put(truncatedTime, retVal); sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal)); } return retVal; }
|
在初次创建一个Sink的时候,会通过segmentAnnouncer通知生成一个新的Segment. 实际上是通知ZooKeeper创建对应的临时节点.
然后往sinks中添加截断的时间撮和Sink的映射关系,假设后面事件的截断时间撮(比如都在同一个小时内),就直接使用创建好的Sink.
sinkTimeline是Sink的时间线,除了Interval,还有版本信息,分区编号. 比如一个Segment在同一个小时内数据量太大,会分成多个分区.
Sink使用了FireHydrant和IncrementalIndex增量索引. 我们知道Druid存储的并不是原始数据,而是Roll-up后的结果.
在前面getSink第一次创建Sink的时候, 也会顺带创建FireHydrant和OnheapIncrementalIndex(在堆中的增量索引)以及DataSegment!
因为实时数据写入到实时节点,经过索引后,这些数据要能够立即被查询到. 所以经过Roll-up后的数据是放在实时节点的内存中的.

IncrementalIndex
添加一行InputRow会从FireHydrant中获取出OnheapIncrementalIndex,往增量索引中添加一条记录.
- dimensions: 一行记录的所有列名称,从dataSchema的dimensionsSpec/dimensions指定
- dimensionValues: 某个dimension的列值, 可以是数组有多个值
- dimensionOrder: <dimension, order> 每个dimension列名的位置
- dimValues: DimensionHolder<dimension, DimDim> 通过canonical可以快速判断列值是否存在
- dims[][]: dims[index]中的index来自于dimensionOrder对应的order顺序, 值=getDimVals(DimDim, dimensionValues)
- TimeAndDims: 时间撮和dims组成
- metrics AggregatorFactory[]: 聚合算子的构造工厂,通过工厂类的factorize可以构造出算子
- aggs AggregatorType[]: metrics指标列对应的每个Aggregator算子
在准备了上面的这些数据后, IncrementalIndex调用addToFacts添加facts, OnheapIncrementalIndex的实现会使用构造好的聚合算子,开始聚合操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator> { private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>(); private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();
protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, InputRow row, AtomicInteger numEntries, TimeAndDims key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier){ Aggregator[] aggs = new Aggregator[metrics.length]; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)); }
rowContainer.set(row);
|
InputRow添加到IncrementalIndex, 会加入到增量索引的facts中. facts的TimeAndDims包含了时间撮和维度信息.
经过Roll-up的聚合算子会进行聚合操作,聚合结果也可以通过IncrementalIndex的相关getXXXValue获取.
persistHydrant
在persist持久化最开始, 会进行swap操作:创建一个新的FireHydrant,返回旧的FireHydrant.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public void persist(final Committer committer) { final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList(); for (Sink sink : sinks.values()) {
|
持久化增量索引,IndexMerge.persist会进一步调用merge,创建IncrementalIndexAdapter适配器.因为indexToPersist是OnHeapIncrementalIndex
1 2 3 4
|
return merge( Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index, indexSpec.getBitmapSerdeFactory().getBitmapFactory())), index.getMetricAggs(), outDir, segmentMetadata, indexSpec, progress );
|
IndexMerge会对维度和指标合并成mergedDimensions,mergedMetrics,还有每一行的合并函数rowMergerFn. 最后makeIndexFiles创建索引文件.