首页 > 其他 > 详细

streaming

时间:2020-06-30 18:14:22      阅读:59      评论:0      收藏:0      [点我收藏+]

流处理,对kafka产生的数据流进行处理:

val lines = kafkaStream.flatMap { batch =>
      batch.value().split("\n")
    }

    // 用正则匹配将日志格式化,并同时完成日期时间转时间戳
    val simpleDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)

    val logRecords = lines.map { row =>
      val pattern: Regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
      val options = pattern.findFirstMatchIn(row)
      if (options.isDefined) {
        // 若匹配,返回格式化日志结构
        val matched = options.get
        log(matched.group(1), matched.group(2), matched.group(3),
          String.valueOf(simpleDateFormat.parse(matched.group(4)).getTime),
          matched.group(5), matched.group(6), matched.group(7), matched.group(8), matched.group(9))
      }
      else {
        // 若不匹配,用空值跳过
        log("foo", "foo", "foo", "0", "foo", "foo", "foo", "foo", "0")
      }
    }

  上面就是对kafka产生的数据流处理,kafka的数据流是直接产生到log文件里面的。根据换行符分隔成一个日志数组。经过正则匹配得到日志的其他部分。

接下来就是对获得的信息进行一个统计了。

技术分享图片

 

 可以将统计的信息放到redis里面进行存储。

接下来就是利用逻辑回归模型对日志分类了。根据learning文件里面提供的接口进行调用。

技术分享图片

 

 model.transform是模型开始处理数据的方法,括号中的是对数据进行了一定的处理,将url为空值的数据替换为了""

streaming

原文:https://www.cnblogs.com/blog-lmk/p/13215036.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!