首页 > 其他 > 详细

spark sc.textFile() 指定换行符

时间:2018-09-15 23:42:04      阅读:590      评论:0      收藏:0      [点我收藏+]

直接上代码

package com.jason.spark23

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

object WriteTest {
  implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {
    def textFile(
                  path: String,
                  delimiter: String,
                  maxRecordLength: String = "1000000"
                ): RDD[String] = {

      val conf = new Configuration(sc.hadoopConfiguration)

      // This configuration sets the record delimiter:
      conf.set("textinputformat.record.delimiter", delimiter)
      // and this one limits the size of one record:
      conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)

      sc.newAPIHadoopFile(
        path,
        classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
        conf
      )
        .map { case (_, text) => text.toString }
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("readtest")
      .master("local")
      .getOrCreate()
    import spark.implicits._
    /*val pathjson = "C:\\notos\\code\\sparktest\\src\\main\\resources\\employees.json"
    println("====json df") //jsondf 会自动给schema设置类型
    val jsonDf = spark.read.json(pathjson)
    jsonDf.show()
    //jsonDf.write.format("text").save("C:\\notos\\code\\sparktest\\src\\main\\resources\\text")
    jsonDf.rdd.saveAsTextFile("")*/

    val pathtxt = "C:\\notos\\code\\sparktest\\src\\main\\resources\\people2.txt"
    val dd = spark.read.option("textinputformat.record.delimiter","||").format("text").load(pathtxt)
    dd.show()
    dd.rdd.collect.foreach(println)
    val sc = spark.sparkContext
    val people2 = sc.textFile(pathtxt,"||")
    people2.collect().foreach(println)
    spark.stop()
  }
}

这里使用了scala 中的隐式转换,当调用sc.textFile(path,delimiter)时 sc会被自动包装成ContextExtensions ,并调用其textFile 方法

spark sc.textFile() 指定换行符

原文:https://www.cnblogs.com/jason-dong/p/9653015.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!