今天完成了实验五第二问,因为自己一开始不会,搜索了相关知识后稍微了解了,然后跟着网上的思路解决了,出现了点问题,还是完成了。明天完成第三问,进行实验六。
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import spark.implicits._ object RDDtoDF{ def main(args: Array[String]) { case class Employee(id:Long,name: String, age: Long) val employeeDF = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF() employeeDF.createOrReplaceTempView("employee") val employeeRDD = spark.sql("select id,name,age from employee") employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }
但是写成scala文件运行,会报错。
暂时没解决。
但是把每一行看做一个spark命令,在spark-shell中执行,可以成功运行。
import org.apache.spark.sql.types._ import org.apache.spark.sql.Encoder import org.apache.spark.sql.Row object RDDtoDF{ def main(args: Array[String]) { val employeeRDD = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt") val schemaString = "id name age"val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val rowRDD = employeeRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1), attributes(2).trim)) val employeeDF = spark.createDataFrame(rowRDD, schema) employeeDF.createOrReplaceTempView("employee") val results = spark.sql("SELECT id,name,age FROM employee") results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }
原文:https://www.cnblogs.com/quyangzhangsiyuan/p/12285387.html