注意:任何的聚合操作都有默认的分组,聚合是在分组的基础上进行的。比如,对整体进行求和,那么分组就是整体。所以,在做聚合操作之前,一定要明确是在哪个分组上进行聚合操作
注意:聚合操作,本质上是一个多对一(一对一是多对一的特殊情况)的操作。特别注意的是这个’一‘,可以是一个值(mean, sum等),同样也可以是一个对象(list, set等对象)
除了DataFrame的某些操作或者通过.stat访问方法,所有的聚合操作都是以函数的方式出现的。大多数聚合函数可以在org.apache.spark.sql.functions中找到
spark的聚合还可以将某列上的数值聚合到一个list中,或者将唯一值聚合到set集合中。
案例:将国家列直接生成list列和set列
val path="/Volumes/Data/BigData_code/data/retail-data/all/*.csv"
//读取数据
val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true")
.load(path).coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
df.show()
//将Country聚合成set列和list列
df.agg(collect_set("Country").as("CountrySet"), collect_list("Country").as("CountryList")).show()
//使用表达式分组
df.groupBy("InvoiceNo").agg(
count("Quantity").as("quan"), //使用函数方式
expr("count(Quantity)") //使用字符串表达式
).show()
//使用Map进行分组
df.groupBy("InvoiceNo").agg("Quantity"->"count", "Quantity"->"stddev_pop").show()
使用UDAF来计算输入数据组(与单行相对)的自定义计算。
若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法:
class BoolAnd extends UserDefinedAggregateFunction{
//指定输入参数
override def inputSchema: StructType = StructType(
StructField("Value", BooleanType)::Nil
)
//用于指定UDAF中间结果,中间结果使用StructType
override def bufferSchema: StructType = StructType(
StructField("value", BooleanType)::Nil
)
//用于指定返回结果,返回结果为DataType
override def dataType: DataType = BooleanType
//此UDAF对某个输入是否会返回相同的结果
override def deterministic: Boolean = true
//初始化聚合缓冲区的初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=true
}
//描述如何根据给定行更新内部缓冲区
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0)=buffer.getAs[Boolean](0)&&input.getAs[Boolean](0)
}
//描述如何聚合两个内部缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
//生成聚合的最终结果
override def evaluate(buffer: Row): Any = {
buffer(0)
}
}
实例化BoolAnd类,并将其注册为一个函数:
//准备数据
val df = spark.range(1).selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
.selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
df.show()
//实例化类,注册为udaf
val ba = new BoolAnd
spark.udf.register("booland", ba)
df.select(ba(col("t")), expr("booland(f)")).show()
原文:https://www.cnblogs.com/ALINGMAOMAO/p/14452057.html