首页 > 数据库技术 > 详细

sparksql系列(二) sql常规操作

时间:2019-10-13 22:45:24      阅读:107      评论:0      收藏:0      [点我收藏+]

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode

object WordCount {

  def initSparkAndData() : DataFrame = {

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}",
      "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘20‘,‘vip‘:‘f‘}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf
  }

  def main(args: Array[String]): Unit = {
    val data = initSparkAndData()


  }
}

上面讲sparksession初始化和数据的加载定义为一个方法,方便后续叙述

select,filter,groupby,sum

  val data = initSparkAndData()

  val simpleoption = data.select(col("name"),col("age"),col("vip"))
    .filter(col("name") =!= "zhangsan" && col("vip") === "t")             //其实是zhangsan的过滤,主要是展示===和=!=的写法
    .groupBy(col("vip"))                                                                        //sql理解就是 group by 语句
    .agg(sum(col("age")) as "sumage")                                             //sql理解就是 sum语句
    .show(100)                                                                                     //显示vip、sumage两列,sparksql自动补齐 

  val simpleoption = data.select(col("name"),col("age"),col("vip"))
    .filter(col("name") =!= "zhangsan" && col("vip") === "t")
    .groupBy(col("vip"))
    .agg(avg(col("age")) as "avgage")                                               //求平均值
    .show(100)

  val simpleoption = data.select(col("name"),col("age"),col("vip"))
    .filter(col("name") =!= "zhangsan" && col("vip") === "t")
    .groupBy(col("vip"))
    .agg(count(col("vip")) as "vipnumber")                                       //count个数
    .show(100)

  val simpleoption = data.select(col("name"),col("age"),col("vip"))
    .filter(col("name") =!= "zhangsan" && col("vip") === "t")
    .groupBy(col("vip"))
    .agg(countDistinct(col("vip")) as "vipnumber")                            //count个数的时候去重
    .show(100)

DataFrame中sum值转换为数字

                DataFrame中没有将sum的值转换为数字的直接方法,所以需要自己手动写

    

    val data = initSparkAndData()
    val simpleoption = data.select(col("name"),col("age"),col("vip"))
      .filter(col("name") === "zhangsan" && col("vip") =!= "t")
      .groupBy(col("vip"))
.      agg(sum(col("age")) as "sumage")

    val collection = simpleoption.select(col("sumage")).rdd.collect()
    val value = if(collection.length > 0) collection.apply(0).toString().replace("[", "").replace("]", "").toString() else "0"
    println(collection.apply(0).toString())
    println(value)

DataFrame中列in使用

     val data = initSparkAndData()

    val nameList1 = List("wangwu","zhangsan")
    val nameList2 = data.select(col("name")).rdd.map(r => r(0).toString).collect().toList
    println(nameList1)
    println(nameList2)
    val simpleoption = data.select(col("name"),col("age"),col("vip"))
      .filter(col("name").isin(nameList1:_*)).show(100)

 

sparksql系列(二) sql常规操作

原文:https://www.cnblogs.com/wuxiaolong4/p/11668437.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!