本章主要讨论,在Spark2.4 Structured Streaming读取kafka数据源时,kafka的topic数据是如何被执行的过程进行分析。
以下边例子展开分析:
SparkSession sparkSession = SparkSession.builder().getOrCreate(); Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option("", "").load(); sourceDataset.createOrReplaceTempView("tv_test"); Dataset<Row> aggResultDataset = sparkSession.sql("select ...."); StreamingQuery query = aggResultDataset.writeStream().format("kafka").option("", "") .trigger(Trigger.Continuous(1000)) .start(); try { query.awaitTermination(); } catch (StreamingQueryException e1) { e1.printStackTrace(); }
上边例子业务,使用structured streaming读取kafka的topic,并做agg,然后sink到kafka的另外一个topic上。
要分析DataSourceReader#load方法返回的DataSet的处理过程,需要对DataSourceReader的load方法进行分析,下边这个截图就是DataSourceReader#load的核心代码。
1)经过上篇文章《》分析,我们知道DataSource.lookupDataSource()方法,返回的是KafkaSourceProvider类,那么ds就是KafkaSourceProvider的实例对象;
2)从上边截图我们可以清楚的知道KafkaSourceProvider(https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala)的定义继承了DataSourceRegister,StreamSourceProvider,StreamSinkProvider,RelationProvider,CreatableRelationProvider,StreamWriteProvider,ContinuousReadSupport,MicroBatchReadSupport等接口
v1DataSource是DataSource类,那么我们来分析DataSource初始化都做了什么事情。
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim. // We can‘t be sure at this point whether we‘ll actually want to use V2, since we don‘t know the // writer or whether the query is continuous. val v1DataSource = DataSource( sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap)
在DataSource初始化做的事情只有这些,并未加载数据。
1)调用object DataSource.loopupDataSource加载provider class;
2)获取kafka的topic的schema;
3)保存option参数,也就是sparkSession.readStream().option相关参数;
4)获取sparkSession属性。
1)DataSource#sourceSchema方法内部调用KafkaSourceProvider的#sourceShema(。。。);
2)KafkaSourceProvider#sourceSchema返回了Map,(key:shourName(),value:KafkaOffsetReader.kafkaSchema)。
代码分析到这里并未加载数据。
ds就是provider实例,
v1DataSource是实际上就是包含source的provider,source的属性(spark.readeStream.option这些参数[topic,maxOffsetsSize等等]),source的schema的,它本身是一个数据描述类。
两个主要区别还是在tempReader的区别:
MicroBatchReadSupport:使用KafkaSourceProvider的createMicroBatchReader生成KafkaMicroBatchReader对象;
ContinuousReadSuuport:使用KafkaSourceProvider的createContinuousReader生成KafkaContinuousReader对象。
但是最终都被包装为StreamingRelationV2 extends LeafNode (logicPlan)传递给Dataset,Dataset在加载数据时,执行的就是这个logicplan
package org.apache.spark.sql.execution.streaming import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2} object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { StreamingRelation( dataSource, dataSource.sourceInfo.name, dataSource.sourceInfo.schema.toAttributes) } } /** * Used to link a streaming [[DataSource]] into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]]. * It should be used to create [[Source]] and converted to [[StreamingExecutionRelation]] when * passing to [[StreamExecution]] to run a query. */ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { override def isStreaming: Boolean = true override def toString: String = sourceName // There‘s no sensible value here. On the execution path, this relation will be // swapped out with microbatches. But some dataframe operations (in particular explain) do lead // to this node surviving analysis. So we satisfy the LeafNode contract with the session default // value. override def computeStats(): Statistics = Statistics( sizeInBytes = BigInt(dataSource.sparkSession.sessionState.conf.defaultSizeInBytes) ) override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance())) } 。。。。 // We have to pack in the V1 data source as a shim, for the case when a source implements // continuous processing (which is always V2) but only has V1 microbatch support. We don‘t // know at read time whether the query is conntinuous or not, so we need to be able to // swap a V1 relation back in. /** * Used to link a [[DataSourceV2]] into a streaming * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]], * and should be converted before passing to [[StreamExecution]]. */ case class StreamingRelationV2( dataSource: DataSourceV2, sourceName: String, extraOptions: Map[String, String], output: Seq[Attribute], v1Relation: Option[StreamingRelation])(session: SparkSession) extends LeafNode with MultiInstanceRelation { override def otherCopyArgs: Seq[AnyRef] = session :: Nil override def isStreaming: Boolean = true override def toString: String = sourceName override def computeStats(): Statistics = Statistics( sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) ) override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session) }
那两个reader是microbatch和continue获取数据的根本规则定义。
StreamingRelation和StreamingRelationV2只是对datasource的包装,而且自身继承了catalyst.plans.logical.LeafNode,并不具有其他操作,只是个包装类。
实际上这些都是一个逻辑计划生成的过程,生成了一个具有逻辑计划的Dataset,以便后边触发流处理是执行该逻辑计划生成数据来使用。
start()方法返回的是一个StreamingQuery对象,StreamingQuery是一个接口类定义在:
aggResult.wirteStream.format(“kafka”).option(“”,””).trigger(Trigger.Continuous(1000)),它是一个DataStreamWriter对象:
在DataStreamWriter中定义了一个start方法,在这个start方法是整个流处理程序开始执行的入口。
DataStreamWriter的start方法内部走的分支代码如下:
上边的DataStreamWriter#start()最后一行调用的StreamingQueryManager#startQuery()
Spark(六十一):Spark On YARN启动流程源码分析(一)
原文:https://www.cnblogs.com/yy3b2007com/p/11421345.html