Dataset是一个分布式的数据集。Dataset是Spark 1.6开始新引入的一个接口,它结合了RDD API的很多优点(包括强类型,支持lambda表达式等),以及Spark SQL的优点(优化后的执行引擎)。Dataset可以通过JVM对象来构造,然后通过transformation类算子(map,flatMap,filter等)来进行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不过因为Python语言本身的天然动态特性,Dataset API的不少feature本身就已经具备了(比如可以通过row.columnName来直接获取某一行的某个字段)。R语言的情况跟Python也很类似。
Dataframe就是按列组织的Dataset。在逻辑概念上,可以大概认为Dataframe等同于关系型数据库中的表,或者是Python/R语言中的data frame,但是在底层做了大量的优化。Dataframe可以通过很多方式来构造:比如结构化的数据文件,Hive表,数据库,已有的RDD。Scala,Java,Python,R等语言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的类型别名。在Java中,需要使用Dataset<Row>来代表一个Dataframe。
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long) val employeeDS=employee.as[Employee]
1 val employeeDistinct=employeeDS.distinct() 2 employeeDistinct.show() 3 val employeeDropDup=employeeDS.dropDuplicates(Seq("name")) 4 employeeDropDup.show()
employeeDS.except(employeeDS2).show()
employeeDS.intersect(employeeDS2).show()
employeeDS.filter(employee=>employee.age>35).show()
1 employeeDS.map(employee=>( 2 employee.name,employee.salary,employee.salary+1000 3 )).show() 4 employeeDS.flatMap(employee=>Seq( 5 (employee.name,employee.salary,employee.salary+1000), 6 (employee.name,employee.salary,employee.salary+2000) 7 )).show() 8 employeeDS.mapPartitions(employee=>{ 9 val result=scala.collection.mutable.ArrayBuffer[(String,Long,Long)]() 10 while(employee.hasNext){ 11 var temp=employee.next() 12 result += ((temp.name,temp.salary,temp.salary+5000)) 13 } 14 result.iterator 15 }).show()
1 employee.joinWith(department, $"deptId" === $"id").show()
1 employeeDS.sort($"salary".desc).show()
1 val employeeDSArr=employeeDS.randomSplit(Array(3,10,20)) 2 employeeDSArr.foreach(ds=>ds.show()) 3 employeeDS.sample(false, 0.3).show()
1 employee 2 .join(department, $"depId" === $"id") 3 .groupBy(department("name")) 4 .agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")), min(employee("salary")), count(employee("name")), countDistinct(employee("name"))) 5 .show()
1 /** 2 [1,WrappedArray(Leo, Jack),WrappedArray(Jack, Leo)] 3 [3,WrappedArray(Tom, Kattie),WrappedArray(Tom, Kattie)] 4 [2,WrappedArray(Marry, Jen, Jen),WrappedArray(Marry, Jen)] 5 */ 6 employee.groupBy(employee("depId")) 7 .agg(collect_list(employee("name")),collect_set(employee("name"))) 8 .collect() 9 .foreach(println)
链接:https://www.jianshu.com/p/f017716187b3
原文:https://www.cnblogs.com/iber/p/12984682.html