SELECT a1,a2,a3 FROM tableA Where condition
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText)) //parseSql(sqlText)对sql语句进行语法解析
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成UnresolvedLogicalPlan。/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */ protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)类SchemaRDD继承自SchemaRDDLike
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala */
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLikeSchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成UnresolvedLogicalPlan,这里的baseLogicalPlan就是指UnresolvedLogicalPlan。/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala */
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected abstract class QueryExecution {
def logical: LogicalPlan
//对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
//对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
lazy val optimizedPlan = optimizer(analyzed)
// 将optimized LogicalPlan转换成PhysicalPlan
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// PhysicalPlan执行前的准备工作,生成可执行的物理计划
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
//执行可执行物理计划
lazy val toRdd: RDD[Row] = executedPlan.execute()
......
}sqlContext总的一个过程如下图所示:/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override def sql(sqlText: String): SchemaRDD = {
// 使用spark.sql.dialect定义采用的语法解析器
if (dialect == "sql") {
super.sql(sqlText) //如果使用sql解析器,则使用sqlContext的sql方法
} else if (dialect == "hiveql") { //如果使用和hiveql解析器,则使用HiveQl.parseSql
new SchemaRDD(this, HiveQl.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect
== "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = {
try {
if (条件) {
//非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
.....
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}
} catch {
//异常处理
......
}
}因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */ /** * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))和sqlContext一样,类SchemaRDD继承自SchemaRDDLike ,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数,
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
......
}所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {
LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}hiveContext的analyzer,使用了新的catalog和functionRegistry:/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false)/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ @transient override protected[sql] val planner = hivePlanner
原文:http://blog.csdn.net/book_mmicky/article/details/39956809