在第三篇总结了Unresolved Plan的生成过程,在此之后就是将其转换为Analyzed Plan。这这一步主要涉及到QueryExecution、Analyzer、catalog等。
spark.sql() -> Dataset.ofRows :
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) }
在上面的方法里面,qe.assertAnalyzed() 这个会触发计划树的analyzed,这么做一部分考虑是为了通过绑定catalog里面的数据信息,及时发现一些错误,例如函数名、字段名不匹配找不到。
真正的analyzed的操作是在调用QueryExecution的实例化analyzed变量的时候进行的。
def assertAnalyzed(): Unit = analyzed lazy val analyzed: LogicalPlan = { SparkSession.setActiveSession(sparkSession) sparkSession.sessionState.analyzer.executeAndCheck(logical) }
那么QueryExecution以及analyzer还有catalog在什么时候实例化的呢?
这个是在sparkSession中:
lazy val sessionState: SessionState = { parentSessionState .map(_.clone(this)) .getOrElse { val state = SparkSession.instantiateSessionState( SparkSession.sessionStateClassName(sparkContext.conf), self) initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) } state } }
这里其实是instantiateSessionState() -> BaseSessionStateBuilder.build()方法中build SessionState确定的:
/** * Build the [[SessionState]]. */ def build(): SessionState = { new SessionState( session.sharedState, conf, experimentalMethods, functionRegistry, udfRegistration, () => catalog, sqlParser, () => analyzer, () => optimizer, planner, streamingQueryManager, listenerManager, () => resourceLoader, createQueryExecution, createClone) }
Analyzer:
protected def analyzer: Analyzer = new Analyzer(catalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: DataSourceAnalysis(conf) +: customPostHocResolutionRules override val extendedCheckRules: Seq[LogicalPlan => Unit] = PreWriteCheck +: PreReadCheck +: HiveOnlyCheck +: customCheckRules }
从上面可以看到Analyzer携带了catalog和sqlconf的引用,所以analyzed中主要在这里面进行;analyzed主要是:
sparkSession.sessionState.analyzer.executeAndCheck(logical)
这一行代码中executeAndCheck内容在Analyzer中:
def executeAndCheck(plan: LogicalPlan): LogicalPlan = { val analyzed = execute(plan) try { checkAnalysis(analyzed) EliminateBarriers(analyzed) } catch { case e: AnalysisException => val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } }
val analyzed = execute(plan)这行其实调用的是RuleExecutor.execute;
def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) logTrace( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(plan.treeString, result.treeString).mkString("\n")} """.stripMargin) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) } result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } } curPlan } }
从上面的代码可以看出,在迭代应用rule的退出条件就是大于batch现在的迭代次数,或者前一次迭代和上次一样。RuleExecutor内部提供了Seq[Batch],子类中可以规定具体的Batch,每个Batch都包含一系列的策略和规则,这里的RuleExecutor.execute只是逻辑的依次遍历batch,然后应用具体的rule,这些具体的batch和rule都由子类去实现。
到这里其实我们主要关心的就可以放在Analyzer中重写的batch的部分,看看到底都有哪些规则和策略:
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: // 解析表、列名与树中节点进行绑定 ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: // 解析函数 ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveTimeZone(conf) :: ResolvedUuidExpressions :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
这里主要看看ResolveRelations和ResolveFunctions看看是怎么和catalog里面的表、函数绑定的:
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) } // Look up the table with the given name from catalog. The database we used is decided by the // precedence: // 1. Use the database part of the table identifier, if it is defined; // 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used, // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. private def lookupTableFromCatalog( u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { val tableIdentWithDb = u.tableIdentifier.copy( database = u.tableIdentifier.database.orElse(defaultDatabase)) try { catalog.lookupRelation(tableIdentWithDb) } catch { case e: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e) // If the database is defined and that database is not found, throw an AnalysisException. // Note that if the database is not defined, it is possible we are looking up a temp view. case e: NoSuchDatabaseException => u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + s"database ${e.db} doesn‘t exist.", e) } }
上面是绑定表信息的过程,主要是就是调用lookupTableFromCatalog在catalog中查找匹配,其次关于函数的匹配也类似
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { case q: LogicalPlan => q transformExpressions { case u if !u.childrenResolved => u // Skip until children are resolved. case u: UnresolvedAttribute if resolver(u.name, VirtualColumn.hiveGroupingIdName) => withPosition(u) { Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)() } case u @ UnresolvedGenerator(name, children) => withPosition(u) { catalog.lookupFunction(name, children) match { case generator: Generator => generator case other => failAnalysis(s"$name is expected to be a generator. However, " + s"its class is ${other.getClass.getCanonicalName}, which is not a generator.") } } case u @ UnresolvedFunction(funcId, children, isDistinct) => withPosition(u) { catalog.lookupFunction(funcId, children) match { // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within // the context of a Window clause. They do not need to be wrapped in an // AggregateExpression. case wf: AggregateWindowFunction => if (isDistinct) { failAnalysis(s"${wf.prettyName} does not support the modifier DISTINCT") } else { wf } // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) // This function is not an aggregate function, just return the resolved one. case other => if (isDistinct) { failAnalysis(s"${other.prettyName} does not support the modifier DISTINCT") } else { other } } } } } def lookupFunction( name: FunctionIdentifier, children: Seq[Expression]): Expression = synchronized { // Note: the implementation of this function is a little bit convoluted. // We probably shouldn‘t use a single FunctionRegistry to register all three kinds of functions // (built-in, temp, and external). if (name.database.isEmpty && functionRegistry.functionExists(name)) { // This function has been already loaded into the function registry. return functionRegistry.lookupFunction(name, children) } // If the name itself is not qualified, add the current database to it. val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val qualifiedName = name.copy(database = Some(database)) if (functionRegistry.functionExists(qualifiedName)) { // This function has been already loaded into the function registry. // Unlike the above block, we find this function by using the qualified name. return functionRegistry.lookupFunction(qualifiedName, children) } // The function has not been loaded to the function registry, which means // that the function is a permanent function (if it actually has been registered // in the metastore). We need to first put the function in the FunctionRegistry. // TODO: why not just check whether the function exists first? val catalogFunction = try { externalCatalog.getFunction(database, name.funcName) } catch { case _: AnalysisException => failFunctionLookup(name) case _: NoSuchPermanentFunctionException => failFunctionLookup(name) } loadFunctionResources(catalogFunction.resources) // Please note that qualifiedName is provided by the user. However, // catalogFunction.identifier.unquotedString is returned by the underlying // catalog. So, it is possible that qualifiedName is not exactly the same as // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). // At here, we preserve the input from the user. registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false) // Now, we need to create the Expression. functionRegistry.lookupFunction(qualifiedName, children) }
函数的绑定是在functionRegistry中寻找绑定;
其他的规则和策略感兴趣的可以阅读源码来学习。这一步之后就生成了Analyzed Plan。
Spark SQL(4)-Unresolved Plan到Analyzed Plan
原文:https://www.cnblogs.com/ldsggv/p/13380523.html