首页 > 其他 > 详细

spark streaming向RDD和DataFrame转换

时间:2020-06-11 00:31:04      阅读:75      评论:0      收藏:0      [点我收藏+]

Data streaming转为DataFrame,不能直接一步转到DF,需要先转为RDD,然后再转到DF,我们用流式处理数据后,再通过spark sql实时获取我们想要的结果。

1.首先老规矩,创建spark上下文对象,spark SQL和spark Streaming,再创建个socket在Linux端打入数据。

1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]")
2 
3     conf.set("spark.sql.shuffle.partitions", "1")
4     val sc = new SparkContext(conf)
5     val sqlContext = new SQLContext(sc)
6 
7     val ssc = new StreamingContext(sc,Durations.seconds(5))
8 
9     val lines = ssc.socketTextStream("master",8888)

2.首先用foreachRDD方法把spark streaming转为RDD

 1     /**
 2       *
 3       * DS  -> RDD -> DF
 4       *
 5       */
 6     lines.foreachRDD(rdd => {
 7      val stuRDD = rdd.map(line => {
 8         val split = line.split(",")
 9        (split(0),split(1),split(2).toInt,split(3),split(4))
10       })

3.导入sqlContext隐式转换,将RDD To成DF,同时传入column对应RDD返回值

 1       //导入隐式转换,将RDD转换为DF
 2       import sqlContext.implicits._
 3 
 4       val stuDF = stuRDD.toDF("id","name","age","gender","clazz")
 5       //注册成表
 6       stuDF.registerTempTable("student")
 7 
 8       val result = sqlContext.sql("select clazz,count(1) from student group by clazz")
 9 
10       result.show()
11       result.write.mode(SaveMode.Append).json("Spark/data/dsondf")

总结,这里的sqlContext必须自己创建好,原来我还以为是导包的时候,类都已经封装好了的,直接import就行了,报的数组下标越界,懵了半天。

spark streaming向RDD和DataFrame转换

原文:https://www.cnblogs.com/zzzzrrrr/p/13089534.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!