narrow :onetoone prune range
wide :shuffle
查看依赖:
.dependecies
.toDebugString
--------------------------------------
catalyst:(sql‘s query optimizer)
reordering operations
reduce the amount of data we read
pruning unneeded partitions
---by analyzing DF and filter operations(RDD google paper)
Tungsten:(sql‘s off-heap data encoder)
highly-specialized data encoders(more data fit inmemory)
column-based(store data in column-based format)
off-heap(avoid GC,managed by Tungsten)
DF limitations:--->use RDD
即使不存在column “id”,但会compile,runtime出错。
data type 只有classes/products/sql datatypes (Tungsten encoder supports)
requires semistructured/structured data. (not like JSON)
--------------------------------------
1 Rows don‘t have type infomation. cannot be collected back to master
val avgPriceagain = avgPrice.map{ row => (row(0).asInstanceOf[String], row(1).asInstanceOf[Int]) }
avgPrice.head.schema.printTreeString()
val avgPriceagain = avgPrice.map{
row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[Double])
}
若提供的schema错误,则CastException, 通过head.schema.printTreeString()获取后重新map。
====>Dataset: typed distributed collections of data, have to have schema and encoders
2 more type info than DF,more optimizations than RDD
both typed and untyped relational operations, can use map.filter.flatMap again
import spark.implicits._
listingDF.toDS
listingDS .groupByKey( l=> l.zip) .agg(avg(‘price)).as[Double]) //mix Apis
val listingDS = spark.read.json("people.josn").as[Person]
myRDD.toDS
List("a","s").toDS
//found: Column required:TypedColumn
‘price.as[Double]
3 groupByKey 返回 KeyValueGroupedDataset。和DF一样在groupByKey之后有aggregation
KeyValueGroupedDataset含有aggregation操作,返回DS
4 一些aggregation operations on KeyValueGroupedDataset
reduceGroups(f:(V,V)=>V):Dataset[(K,V)]
agg[U](col :TypedColumn[V,U]):Dataset[(K,U)]
mapGroups/flapMapGroups
5 没有reduceByKey 替换
keyvalueRDD.reduceByKey(_+_) keyvalueDS.groupByKey(p=>p._1).mapValues(p=>p._2).reduceGroups((acc,str)=>acc+str) val myAgg = new Aggregator[(Int,String),String,String]{ def zero:String = "" def reduce(b:String, a:(Int,String)):String = b+a._2 def merge(b1:String, b3:String):String =b1+b2 def finish(r:String);String =r
override def bufferEncoder:Encoder[String]=Encoders.STRING
override def outputEncoder:Encoder[String]=Encoders.STRING
}.toColumn keyvalueDS.groupByKey(p=>p._1).agg(myAgg.as[String]) //myAgg.reduce((s,input)=> input._2.foldLeft(s)(a,b)=>a+b).toDS keyvalueDS.groupByKey(p=>p._1).mapGroups((k,v)=>(k, v.foldLeft("")((acc,p) = >acc+p._2))).sort(‘_1)
但mapGroups不支持partial aggregation and require shuffling
!!:不要使用mapGroups. 使用reduce或aggregate
6 class Aggregator[-IN,BUF,OUT]
org.spark.sql.expressions.Aggregator
初始化见上。
7 Encoder
将data在JVM object和SparkSQL tabular两种表示方式中转换。所有dataset都需要。
比较于java或Kryo,Tungsten更限制在primitive和caseclass、sparksqltypes,并且含有schema信息。使用内存更少,速度更快。
两种方式引入encoders:
1 (implicitly)import spark.implicits._ 2 (explicitly)org.spark.sql.Encoders
Ecoders.scalaInt
其中部分:
INT LONG STRING etc. for nullable primitives
scalaInt, scalaLong, scalaByte etc. for scala‘s primitive
product/tuple for scala‘s product and tuple types
8 limitations of DS
catalyst无法优化functional operations或lambda,schema info、具体操作都不能知道。
使用high-level functions失去部分catalyst优化,但Tungsten效果仍全保有。
一些scala应用中的class,Tungsten无法Encoder。data types还是有限。
9 使用
RDD:数据unstructured,需要fine-tune 或涉及low-level的computation, 复杂的数据types无法使用Encoders序列化
DS:数据semi-unstructured即可,需要typesafety,需要使用functional api,对性能未必太过追求
DF: 数据semi-unstructured即可,追求automatically optimized
1 hash partitioners and range partitioners
shuffles的时候,partition可以获得性能提升。data locality可避免网络传输。
reduceByKey中,使用range partitioners完全避免shuffle。
val pairs = pRdd.map(p=>p.id, p.price) val tunedPar = new RangePartitioner(8,pairs)//8个partitions val partitioned = pairs.partitionBy(tunedPar) .persist() val purPer = partitioned.map(p=>(p._1,(1,p._2))) val purPerMon = purPer.reduceByKey((v1,v2)=>(v1._1+v2._1,v1._2+v2._2)) .collect()
dependency & DF & DataSet & patitioner
原文:http://www.cnblogs.com/yumanman/p/7633184.html