首页 > 其他 > 详细

日志类型转换Bzip转换为Parquet文件

时间:2021-06-06 00:33:26      阅读:19      评论:0      收藏:0      [点我收藏+]

对于日志类型的字段,默认都是String类型,但是有的本身是Int类型,需要转换

//对于数字类型的日志数据字段转换
object NumFormat {
  def toInt(str: String): Int = {
    try {
      str.toInt
    } catch {
      //其他类返回0
      case _: Exception => 0
    }
  }

  def toDouble(str: String): Double = {
    try {
      str.toDouble
    } catch {
      case _: Exception => 0
    }
  }
}

定义Schema,将日志的字段进行对应,对于这样多字段,尽量使用notepad++以及execl来进行处理

package com.dmp.utils

import org.apache.spark.sql.types._

object SchemaUtils {
  /**
    * 定义日志的Schema结构信息
    */
  val logStructType = StructType(Seq(
    StructField("sessionid", StringType),
    StructField("advertisersid", IntegerType),
    StructField("adorderid", IntegerType),
    StructField("adcreativeid", IntegerType),
    StructField("adplatformproviderid", IntegerType),
    StructField("sdkversion", StringType),
    StructField("adplatformkey", StringType),
    StructField("putinmodeltype", IntegerType),
    StructField("requestmode", IntegerType),
    StructField("adprice", DoubleType),
    StructField("adppprice", DoubleType),
    StructField("requestdate", StringType),
    StructField("ip", StringType),
    StructField("appid", StringType),
    StructField("appname", StringType),
    StructField("uuid", StringType),
    StructField("device", StringType),
    StructField("client", IntegerType),
    StructField("osversion", StringType),
    StructField("density", StringType),
    StructField("pw", IntegerType),
    StructField("ph", IntegerType),
    StructField("long", StringType),
    StructField("lat", StringType),
    StructField("provincename", StringType),
    StructField("cityname", StringType),
    StructField("ispid", IntegerType),
    StructField("ispname", StringType),
    StructField("networkmannerid", IntegerType),
    StructField("networkmannername", StringType),
    StructField("iseffective", IntegerType),
    StructField("isbilling", IntegerType),
    StructField("adspacetype", IntegerType),
    StructField("adspacetypename", StringType),
    StructField("devicetype", IntegerType),
    StructField("processnode", IntegerType),
    StructField("apptype", IntegerType),
    StructField("district", StringType),
    StructField("paymode", IntegerType),
    StructField("isbid", IntegerType),
    StructField("bidprice", DoubleType),
    StructField("winprice", DoubleType),
    StructField("iswin", IntegerType),
    StructField("cur", StringType),
    StructField("rate", DoubleType),
    StructField("cnywinprice", DoubleType),
    StructField("imei", StringType),
    StructField("mac", StringType),
    StructField("idfa", StringType),
    StructField("openudid", StringType),
    StructField("androidid", StringType),
    StructField("rtbprovince", StringType),
    StructField("rtbcity", StringType),
    StructField("rtbdistrict", StringType),
    StructField("rtbstreet", StringType),
    StructField("storeurl", StringType),
    StructField("realip", StringType),
    StructField("isqualityapp", IntegerType),
    StructField("bidfloor", DoubleType),
    StructField("aw", IntegerType),
    StructField("ah", IntegerType),
    StructField("imeimd5", StringType),
    StructField("macmd5", StringType),
    StructField("idfamd5", StringType),
    StructField("openudidmd5", StringType),
    StructField("androididmd5", StringType),
    StructField("imeisha1", StringType),
    StructField("macsha1", StringType),
    StructField("idfasha1", StringType),
    StructField("openudidsha1", StringType),
    StructField("androididsha1", StringType),
    StructField("uuidunknow", StringType),
    StructField("userid", StringType),
    StructField("iptype", IntegerType),
    StructField("initbidprice", DoubleType),
    StructField("adpayment", DoubleType),
    StructField("agentrate", DoubleType),
    StructField("lomarkrate", DoubleType),
    StructField("adxrate", DoubleType),
    StructField("title", StringType),
    StructField("keywords", StringType),
    StructField("tagid", StringType),
    StructField("callbackdate", StringType),
    StructField("channelid", StringType),
    StructField("mediatype", IntegerType)
  ))
}

Bzip转换为Parquet格式

package com.dmp.tools

import com.dmp.utils.{NumFormat, SchemaUtils}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
F:\\牛牛学堂大数据24期\\09-实训实战-9天\\dmp&&移动项目\\dmp\\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
snappy
C:\\Users\\admin\\Desktop\\result1
 */
//将原始日志文件转换成parquet文件格式,1.6版本默认gzip压缩,2.x版本默认snappy压缩
object Bzip2Parquet {
  
  def main(args: Array[String]): Unit = {
    //   0 校验参数个数
    if (args.length != 3) {
      println(
        """
          |cn.dmp.tools.Bzip2Parquet
          |参数:
          | logInputPath
          | compressionCode <snappy, gzip, lzo>
          | resultOutputPath
        """.stripMargin //stripMargin 输出的时候换行对其
      )
      sys.exit()
    }

    // 1 接受程序参数 日志的输入,文件格式,输出路径
    val Array(logInputPath, compressionCode, resultOutputPath) = args

    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}")
      .setMaster("local[*]")
      //RDD 系列化到磁盘上,worker与worker之间的数据传输,如果集群中已经配置,无需多配置
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val sQLContext = new SQLContext(sc)
    sQLContext.setConf("spark.sql.parquet.compression.codec", compressionCode)

    //3 读取日志数据
    val rawdata = sc.textFile(logInputPath)
    //4 根据业务需求对数据进行ETL
    val dataRow = rawdata.map(line => line.split(",", line.length))
      .filter(_.length >= 85)
      .map(arr => {
        Row(
          arr(0),
          NumFormat.toInt(arr(1)),
          NumFormat.toInt(arr(2)),
          NumFormat.toInt(arr(3)),
          NumFormat.toInt(arr(4)),
          arr(5),
          arr(6),
          NumFormat.toInt(arr(7)),
          NumFormat.toInt(arr(8)),
          NumFormat.toDouble(arr(9)),
          NumFormat.toDouble(arr(10)),
          arr(11),
          arr(12),
          arr(13),
          arr(14),
          arr(15),
          arr(16),
          NumFormat.toInt(arr(17)),
          arr(18),
          arr(19),
          NumFormat.toInt(arr(20)),
          NumFormat.toInt(arr(21)),
          arr(22),
          arr(23),
          arr(24),
          arr(25),
          NumFormat.toInt(arr(26)),
          arr(27),
          NumFormat.toInt(arr(28)),
          arr(29),
          NumFormat.toInt(arr(30)),
          NumFormat.toInt(arr(31)),
          NumFormat.toInt(arr(32)),
          arr(33),
          NumFormat.toInt(arr(34)),
          NumFormat.toInt(arr(35)),
          NumFormat.toInt(arr(36)),
          arr(37),
          NumFormat.toInt(arr(38)),
          NumFormat.toInt(arr(39)),
          NumFormat.toDouble(arr(40)),
          NumFormat.toDouble(arr(41)),
          NumFormat.toInt(arr(42)),
          arr(43),
          NumFormat.toDouble(arr(44)),
          NumFormat.toDouble(arr(45)),
          arr(46),
          arr(47),
          arr(48),
          arr(49),
          arr(50),
          arr(51),
          arr(52),
          arr(53),
          arr(54),
          arr(55),
          arr(56),
          NumFormat.toInt(arr(57)),
          NumFormat.toDouble(arr(58)),
          NumFormat.toInt(arr(59)),
          NumFormat.toInt(arr(60)),
          arr(61),
          arr(62),
          arr(63),
          arr(64),
          arr(65),
          arr(66),
          arr(67),
          arr(68),
          arr(69),
          arr(70),
          arr(71),
          arr(72),
          NumFormat.toInt(arr(73)),
          NumFormat.toDouble(arr(74)),
          NumFormat.toDouble(arr(75)),
          NumFormat.toDouble(arr(76)),
          NumFormat.toDouble(arr(77)),
          NumFormat.toDouble(arr(78)),
          arr(79),
          arr(80),
          arr(81),
          arr(82),
          arr(83),
          NumFormat.toInt(arr(84))
        )
      }
      )


    //将结果保存到本地磁盘
    val dataFrame = sQLContext.createDataFrame(dataRow,SchemaUtils.logStructType)
    dataFrame.write.parquet(resultOutputPath)
    sc.stop()
}
}

日志类型转换Bzip转换为Parquet文件

原文:https://blog.51cto.com/bigdata/2871323

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!