Apache Drill可用于大数据的实时分析,引用一段介绍:
受到Google Dremel启发,Apache的Drill项目是对大数据集进行交互式分析的分布式系统。Drill并不会试图取代已有的大数据批处理框架(Big Data batch processing framework),如Hadoop MapReduce或流处理框架(stream processing framework),如S4和Storm。相反,它是要填充现有空白的——对大数据集的实时交互式处理
简单来说,Drill可接收SQL查询语句,然后后端从多个数据源例如HDFS、MongoDB等获取数据并分析产出分析结果。在一次分析中,它可以汇集多个数据源的数据。而且基于分布式的架构,可以支持秒级查询。
Drill在架构上是比较灵活的,它的前端可以不一定是SQL查询语言,后端数据源也可以接入Storage plugin来支持其他数据来源。这里我就实现了一个从HTTP服务获取数据的Storage plugin demo。这个demo可以接入基于GET请求,返回JSON格式的HTTP服务。源码可从我的Github获取:drill-storage-http
例子包括:
select name, length from http.`/e/api:search` where $p=2 and $q='avi' select name, length from http.`/e/api:search?q=avi&p=2` where length > 0
实现
要实现一个自己的storage plugin,目前Drill这方面文档几乎没有,只能从已有的其他storage plugin源码入手,例如mongodb的,参考Drill子项目drill-mongo-storage
。实现的storage plugin打包为jar放到jars
目录,Drill启动时会自动载入,然后web上配置指定类型即可。
主要需要实现的类包括:
AbstractStoragePlugin StoragePluginConfig SchemaFactory BatchCreator AbstractRecordReader AbstractGroupScan
AbstraceStoragePlugin
StoragePluginConfig
用于配置plugin,例如:
{ "type" : "http", "connection" : "http://xxx.com:8000", "resultKey" : "results", "enabled" : true }
它必须是可JSON序列化/反序列化的,Drill会把storage配置存储到/tmp/drill/sys.storage_plugins
中,例如windows下D:\tmp\drill\sys.storage_plugins
。
AbstractStoragePlugin
是plugin的主类,它必须配合StoragePluginConfig
,实现这个类时,构造函数必须遵循参数约定,例如:
public HttpStoragePlugin(HttpStoragePluginConfig httpConfig, DrillbitContext context, String name)
Drill启动时会自动扫描AbstractStoragePlugin
实现类(StoragePluginRegistry
),并建立StoragePluginConfig.class
到AbstractStoragePlugin constructor
的映射。AbstractStoragePlugin
需要实现的接口包括:
// 相应地需要实现AbstraceGroupScan
// selection包含了database name和table name,可用可不用
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection)
// 注册schema
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException
// StoragePluginOptimizerRule 用于优化Drill生成的plan,可实现也可不实现
public Set<StoragePluginOptimizerRule> getOptimizerRules()
Drill中的schema用于描述一个database,以及处理table之类的事务,必须要实现,否则任意一个SQL查询都会被认为是找不到对应的table。AbstraceGroupScan
用于一次查询中提供信息,例如查询哪些columns。
Drill在查询时,有一种中间数据结构(基于JSON)叫Plan,其中又分为Logic Plan和Physical Plan。Logic Plan是第一层中间结构,用于完整表达一次查询,是SQL或其他前端查询语言转换后的中间结构。完了后还要被转换为Physical Plan,又称为Exectuion Plan,这个Plan是被优化后的Plan,可用于与数据源交互进行真正的查询。StoragePluginOptimizerRule
就是用于优化Physical Plan的。这些Plan最终对应的结构有点类似于语法树,毕竟SQL也可以被认为是一种程序语言。StoragePluginOptimizerRule
可以被理解为改写这些语法树的。例如Mongo
storage plugin就实现了这个类,它会把where
中的filter转换为mongodb自己的filter(如{‘$gt’: 2}),从而优化查询。
这里又牵扯出Apache的另一个项目:calcite,前身就是OptiQ。Drill中整个关于SQL的执行,主要是依靠这个项目。要玩转Plan的优化是比较难的,也是因为文档欠缺,相关代码较多。
SchemaFactory
registerSchemas
主要还是调用SchemaFactory.registerSchemas
接口。Drill中的Schema是一种树状结构,所以可以看到registerSchemas
实际就是往parent中添加child:
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
HttpSchema schema = new HttpSchema(schemaName);
parent.add(schema.getName(), schema);
}
HttpSchema
派生于AbstractSchema
,主要需要实现接口getTable
,因为我这个http storage plugin中的table实际就是传给HTTP service的query,所以table是动态的,所以getTable
的实现比较简单:
public Table getTable(String tableName) { // table name can be any of string
HttpScanSpec spec = new HttpScanSpec(tableName); // will be pass to getPhysicalScan
return new DynamicDrillTable(plugin, schemaName, null, spec);
}
这里的HttpScanSpec
用于保存查询中的一些参数,例如这里保存了table name,也就是HTTP service的query,例如/e/api:search?q=avi&p=2
。它会被传到AbstraceStoragePlugin.getPhysicalScan
中的JSONOptions
:
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
HttpScanSpec spec = selection.getListWith(new ObjectMapper(), new TypeReference<HttpScanSpec>() {});
return new HttpGroupScan(userName, httpConfig, spec);
}
HttpGroupScan
后面会看到用处。
AbstractRecordReader
AbstractRecordReader
负责真正地读取数据并返回给Drill。BatchCreator
则是用于创建AbstractRecordReader
。
public class HttpScanBatchCreator implements BatchCreator<HttpSubScan> {
@Override
public CloseableRecordBatch getBatch(FragmentContext context,
HttpSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
List<RecordReader> readers = Lists.newArrayList();
readers.add(new HttpRecordReader(context, config));
return new ScanBatch(config, context, readers.iterator());
}
}
既然AbstractRecordReader
负责真正读取数据,那么它肯定是需要知道传给HTTP service的query的,但这个query最早是在HttpScanSpec
中,然后传给了HttpGroupScan
,所以马上会看到HttpGroupScan
又把参数信息传给了HttpSubScan
。
Drill也会自动扫描BatchCreator
的实现类,所以这里就不用关心HttpScanBatchCreator
的来历了。
HttpSubScan
的实现比较简单,主要是用来存储HttpScanSpec
的:
public class HttpSubScan extends AbstractBase implements SubScan // 需要实现SubScan
回到HttpGroupScan
,必须实现的接口:
public SubScan getSpecificScan(int minorFragmentId) { // pass to HttpScanBatchCreator
return new HttpSubScan(config, scanSpec); // 最终会被传递到HttpScanBatchCreator.getBatch接口
}
最终query被传递到HttpRecordReader
,该类需要实现的接口包括:setup
和next
,有点类似于迭代器。setup
中查询出数据,然后next
中转换数据给Drill。转换给Drill时可以使用到VectorContainerWriter
和JsonReader
。这里也就是Drill中传说的vector数据格式,也就是列存储数据。
总结
以上,就包含了plugin本身的创建,及查询中query的传递。查询中类似select titile, name
中的columns会被传递到HttpGroupScan.clone
接口,只不过我这里并不关注。实现了这些,就可以通过Drill查询HTTP service中的数据了。
而select * from xx where xx
中的where
filter,Drill自己会对查询出来的数据做过滤。如果要像mongo plugin中构造mongodb的filter,则需要实现StoragePluginOptimizerRule
。
我这里实现的HTTP storage plugin,本意是觉得传给HTTP service的query可能会动态构建,例如:
select name, length from http.`/e/api:search` where $p=2 and $q='avi' # p=2&q=avi 就是动态构建,其值可以来源于其他查询结果 select name, length from http.`/e/api:search?q=avi&p=2` where length > 0 # 这里就是静态的
第一条查询就需要借助StoragePluginOptimizerRule
,它会收集所有where中的filter,最终作为HTTP serivce的query。但这里的实现还不完善。
总体而言,由于Drill项目相对较新,要进行扩展还是比较困难的。尤其是Plan优化部分。