目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst 。
在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找
EmptyFunctionRegistry 中lookup 只是抛出一个异常。
所以自定义了一个 FunctionRegistry ,SqlContext
@transient
protected[sql]lazyval analyzer:Analyzer =
newAnalyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)
class UDFRgistry extends FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
name.toLowerCase match {
case "col_set" =>Collect(children(0))
case "array" =>Array(children(0))
case "contains" =>Contains(children)
case _ => throw new UnsupportedOperationException
}
}
}
class SparkSqlContext(val spctx: SparkContext) extends SQLContext(spctx) {
@transient
override lazy val analyzer: Analyzer =
new Analyzer(catalog, new UDF.UDFRgistry, caseSensitive = true)
}原文:http://blog.csdn.net/tyj2788540/article/details/39256059