scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people") sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
scala> df.createGlobalTempView("people")
scala> spark.sql("SELECT * FROM global_temp.people").show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| scala> spark.newSession().sql("SELECT * FROM global_temp.people").show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema root |-- age: long (nullable = true) |-- name: string (nullable = true)
scala> df.select("name").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+
scala> df.select($"name", $"age" + 1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+
scala> df.filter($"age" > 21).show() +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
scala> df.groupBy("age").count().show() +----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+
测试:
scala> spark.read. csv jdbc load options parquet table textFile format json option orc schema text scala> spark.read.json("./examples/src/main/resources/people.json") res0: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> res0.collect res1: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin]) scala> res0.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> res0.select("name").show +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ scala> res0.select("age","name").show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> res0.select($"age"+1).show +---------+ |(age + 1)| +---------+ | null| | 31| | 20| +---------+ scala> res0.select($"age"+1,$"name").show +---------+-------+ |(age + 1)| name| +---------+-------+ | null|Michael| | 31| Andy| | 20| Justin| +---------+-------+ scala> res0.select($"name").show +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ scala> res0.first res9: org.apache.spark.sql.Row = [null,Michael] scala> res0.filter($"age">20).show +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ scala> res0.create createGlobalTempView createOrReplaceTempView createTempView scala> res0.createTempView("people") scala> spark.sql("select * from people").show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> spark.sql("select * from people where age > 20").show +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
scala> import spark.implicits._ import spark.implicits._
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt") peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age") res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> case class People(name:String, age:Int)
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil) structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)} data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
scala> val dataFrame = spark.createDataFrame(data, structType) dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val dfToRDD = df.rdd dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
scala> dfToRDD.collect res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
scala> case class Person(name: String, age: Long) defined class Person
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> case class Person(name: String, age: Long) defined class Person
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> val DS = Seq(Person("Andy", 32)).toDS() DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> DS.rdd res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28
scala> val df = spark.read.json("examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> case class Person(name: String, age: Long) defined class Person
scala> df.as[Person] res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> case class Person(name: String, age: Long) defined class Person
scala> val ds = Seq(Person("Andy", 32)).toDS() ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> val df = ds.toDF df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> df.show +----+---+ |name|age| +----+---+ |Andy| 32| +----+---+
testDF.map{ case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段 名和类型 testDS.map{ case Coltest(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }
testDF.foreach{ line => val col1=line.getAs[String]("col1") val col2=line.getAs[String]("col2") }
dataDF.createOrReplaceTempView("tmp") spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
//保存 val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test") datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取 val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test") val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段 名和类型 /** rdd ("a", 1) ("b", 1) ("a", 1) **/ val test: Dataset[Coltest]=rdd.map{line=> Coltest(line._1,line._2) }.toDS test.map{ line=> println(line.col1) println(line.col2) }
可以看出,Dataset 在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
package com.lxl.sparksql
import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory
object HelloWorld { def main(args: Array[String]) {
//创建 SparkConf()并设置 App 名称 val spark = SparkSession .builder() .appName("Spark SQL basic example") //.config("spark.some.config.option", "some-value") .getOrCreate()
//导入隐式转换 import spark.implicits._
//读取本地文件,创建 DataFrame val df = spark.read.json("examples/src/main/resources/people.json")
//打印 df.show()
//DSL 风格:查询年龄在 21 岁以上的 df.filter($"age" > 21).show()
//创建临时表 df.createOrReplaceTempView("persons")
//SQL 风格:查询年龄在 21 岁以上的 spark.sql("SELECT * FROM persons where age > 21").show()
//关闭连接 spark.stop() } }
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession object HelloWorld { def main(args: Array[String]): Unit = { //获取sparkConf val conf = new SparkConf().setAppName("HelloWorld").setMaster("local[*]") //创建sparkcontext对象 val sc = new SparkContext(conf) //获取sparkSession // val spark = new SparkSession(sc) val spark = SparkSession.builder().config(conf).getOrCreate() //生成DataFrame val df = spark.read.json("D:\\Data\\Spark\\课堂\\people.json") //展示所有数据 df.show() //DSL df.select("name").show() //SQL df.createTempView("people") spark.sql("select * from people").show() //关闭资源 spark.close() sc.stop() } }
结果:
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
原文:https://www.cnblogs.com/LXL616/p/11149053.html