对于日志类型的字段,默认都是String类型,但是有的本身是Int类型,需要转换
//对于数字类型的日志数据字段转换
object NumFormat {
def toInt(str: String): Int = {
try {
str.toInt
} catch {
//其他类返回0
case _: Exception => 0
}
}
def toDouble(str: String): Double = {
try {
str.toDouble
} catch {
case _: Exception => 0
}
}
}
定义Schema,将日志的字段进行对应,对于这样多字段,尽量使用notepad++以及execl来进行处理
package com.dmp.utils
import org.apache.spark.sql.types._
object SchemaUtils {
/**
* 定义日志的Schema结构信息
*/
val logStructType = StructType(Seq(
StructField("sessionid", StringType),
StructField("advertisersid", IntegerType),
StructField("adorderid", IntegerType),
StructField("adcreativeid", IntegerType),
StructField("adplatformproviderid", IntegerType),
StructField("sdkversion", StringType),
StructField("adplatformkey", StringType),
StructField("putinmodeltype", IntegerType),
StructField("requestmode", IntegerType),
StructField("adprice", DoubleType),
StructField("adppprice", DoubleType),
StructField("requestdate", StringType),
StructField("ip", StringType),
StructField("appid", StringType),
StructField("appname", StringType),
StructField("uuid", StringType),
StructField("device", StringType),
StructField("client", IntegerType),
StructField("osversion", StringType),
StructField("density", StringType),
StructField("pw", IntegerType),
StructField("ph", IntegerType),
StructField("long", StringType),
StructField("lat", StringType),
StructField("provincename", StringType),
StructField("cityname", StringType),
StructField("ispid", IntegerType),
StructField("ispname", StringType),
StructField("networkmannerid", IntegerType),
StructField("networkmannername", StringType),
StructField("iseffective", IntegerType),
StructField("isbilling", IntegerType),
StructField("adspacetype", IntegerType),
StructField("adspacetypename", StringType),
StructField("devicetype", IntegerType),
StructField("processnode", IntegerType),
StructField("apptype", IntegerType),
StructField("district", StringType),
StructField("paymode", IntegerType),
StructField("isbid", IntegerType),
StructField("bidprice", DoubleType),
StructField("winprice", DoubleType),
StructField("iswin", IntegerType),
StructField("cur", StringType),
StructField("rate", DoubleType),
StructField("cnywinprice", DoubleType),
StructField("imei", StringType),
StructField("mac", StringType),
StructField("idfa", StringType),
StructField("openudid", StringType),
StructField("androidid", StringType),
StructField("rtbprovince", StringType),
StructField("rtbcity", StringType),
StructField("rtbdistrict", StringType),
StructField("rtbstreet", StringType),
StructField("storeurl", StringType),
StructField("realip", StringType),
StructField("isqualityapp", IntegerType),
StructField("bidfloor", DoubleType),
StructField("aw", IntegerType),
StructField("ah", IntegerType),
StructField("imeimd5", StringType),
StructField("macmd5", StringType),
StructField("idfamd5", StringType),
StructField("openudidmd5", StringType),
StructField("androididmd5", StringType),
StructField("imeisha1", StringType),
StructField("macsha1", StringType),
StructField("idfasha1", StringType),
StructField("openudidsha1", StringType),
StructField("androididsha1", StringType),
StructField("uuidunknow", StringType),
StructField("userid", StringType),
StructField("iptype", IntegerType),
StructField("initbidprice", DoubleType),
StructField("adpayment", DoubleType),
StructField("agentrate", DoubleType),
StructField("lomarkrate", DoubleType),
StructField("adxrate", DoubleType),
StructField("title", StringType),
StructField("keywords", StringType),
StructField("tagid", StringType),
StructField("callbackdate", StringType),
StructField("channelid", StringType),
StructField("mediatype", IntegerType)
))
}
Bzip转换为Parquet格式
package com.dmp.tools
import com.dmp.utils.{NumFormat, SchemaUtils}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/*
F:\\牛牛学堂大数据24期\\09-实训实战-9天\\dmp&&移动项目\\dmp\\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
snappy
C:\\Users\\admin\\Desktop\\result1
*/
//将原始日志文件转换成parquet文件格式,1.6版本默认gzip压缩,2.x版本默认snappy压缩
object Bzip2Parquet {
def main(args: Array[String]): Unit = {
// 0 校验参数个数
if (args.length != 3) {
println(
"""
|cn.dmp.tools.Bzip2Parquet
|参数:
| logInputPath
| compressionCode <snappy, gzip, lzo>
| resultOutputPath
""".stripMargin //stripMargin 输出的时候换行对其
)
sys.exit()
}
// 1 接受程序参数 日志的输入,文件格式,输出路径
val Array(logInputPath, compressionCode, resultOutputPath) = args
val conf = new SparkConf()
conf.setAppName(s"${this.getClass.getSimpleName}")
.setMaster("local[*]")
//RDD 系列化到磁盘上,worker与worker之间的数据传输,如果集群中已经配置,无需多配置
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
sQLContext.setConf("spark.sql.parquet.compression.codec", compressionCode)
//3 读取日志数据
val rawdata = sc.textFile(logInputPath)
//4 根据业务需求对数据进行ETL
val dataRow = rawdata.map(line => line.split(",", line.length))
.filter(_.length >= 85)
.map(arr => {
Row(
arr(0),
NumFormat.toInt(arr(1)),
NumFormat.toInt(arr(2)),
NumFormat.toInt(arr(3)),
NumFormat.toInt(arr(4)),
arr(5),
arr(6),
NumFormat.toInt(arr(7)),
NumFormat.toInt(arr(8)),
NumFormat.toDouble(arr(9)),
NumFormat.toDouble(arr(10)),
arr(11),
arr(12),
arr(13),
arr(14),
arr(15),
arr(16),
NumFormat.toInt(arr(17)),
arr(18),
arr(19),
NumFormat.toInt(arr(20)),
NumFormat.toInt(arr(21)),
arr(22),
arr(23),
arr(24),
arr(25),
NumFormat.toInt(arr(26)),
arr(27),
NumFormat.toInt(arr(28)),
arr(29),
NumFormat.toInt(arr(30)),
NumFormat.toInt(arr(31)),
NumFormat.toInt(arr(32)),
arr(33),
NumFormat.toInt(arr(34)),
NumFormat.toInt(arr(35)),
NumFormat.toInt(arr(36)),
arr(37),
NumFormat.toInt(arr(38)),
NumFormat.toInt(arr(39)),
NumFormat.toDouble(arr(40)),
NumFormat.toDouble(arr(41)),
NumFormat.toInt(arr(42)),
arr(43),
NumFormat.toDouble(arr(44)),
NumFormat.toDouble(arr(45)),
arr(46),
arr(47),
arr(48),
arr(49),
arr(50),
arr(51),
arr(52),
arr(53),
arr(54),
arr(55),
arr(56),
NumFormat.toInt(arr(57)),
NumFormat.toDouble(arr(58)),
NumFormat.toInt(arr(59)),
NumFormat.toInt(arr(60)),
arr(61),
arr(62),
arr(63),
arr(64),
arr(65),
arr(66),
arr(67),
arr(68),
arr(69),
arr(70),
arr(71),
arr(72),
NumFormat.toInt(arr(73)),
NumFormat.toDouble(arr(74)),
NumFormat.toDouble(arr(75)),
NumFormat.toDouble(arr(76)),
NumFormat.toDouble(arr(77)),
NumFormat.toDouble(arr(78)),
arr(79),
arr(80),
arr(81),
arr(82),
arr(83),
NumFormat.toInt(arr(84))
)
}
)
//将结果保存到本地磁盘
val dataFrame = sQLContext.createDataFrame(dataRow,SchemaUtils.logStructType)
dataFrame.write.parquet(resultOutputPath)
sc.stop()
}
}
原文:https://blog.51cto.com/bigdata/2871323