首页 > 其他 > 详细

spark streaming案例实操

时间:2021-05-24 23:00:01      阅读:26      评论:0      收藏:0      [点我收藏+]

需求3:最后一小时广告点击量

代码思路逻辑及分析

1.首先获取kafka中的数据
2.然后对kafka中的数据进行一个封装
3.进行聚合操作
val reduceDs = adChickdata.map(
      data => {
        val ts1 = data.ts.toLong

        //时间戳是以毫秒为单位
        val newTS = ts1 / 10000 * 10000
        (newTS, 1)
      }
    ).reduceByKeyAndWindow((x: Int, y: Int) => {
      x + y
    }, Seconds(60), Seconds(10))
4.对每个rdd进行操作
reduceDs.foreachRDD(
      rdd => {
        val list = new ListBuffer[String]()
        //这个数据是经过reduce之后得到的,它会有一个打乱重组的概念,所以是乱序的,需要进行依次排序
        val datas = rdd.sortByKey(ascending = true).collect()
        //把值放入json文件中
        datas.foreach {
          case (time, cnt) => {
          	//格式化
            val sdf = new SimpleDateFormat("hh:mm:ss")
            val ts = sdf.format(time)
            list.append(s"""{"xtime":"${ts}","yval":"${cnt}"}""")
          }
        }
        //输出到文件
        val out = new PrintWriter(new FileWriter(new File("datas/adclick/adclick.json")))

        out.println("[" + list.mkString(",") + "]")
        out.flush()
        out.close()
      }
    )

代码

package com.project


import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer

/**
 * @ObjectName req3
 * @Description TODO 求最后一小时广告点击量
 * @Author
 * @Date 2021/5/24 8:38
 * @Version 1.0
 * */
object req3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("req3")
    val ssc = new StreamingContext(conf, Seconds(10))

    // kafka 参数
    //kafka参数声明
    val brokers = "master:9092,slave1:9092,slave2:9092"
    val topic = "first"
    val group = "bigdata"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
    val topics = Array(topic)
    val kafkaParams = Map(
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
    )

    //TODO 获取kafka中的数据
    val kafkaDS = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](
        topics, kafkaParams
      )
    )

    //TODO 对kafka中数据进行一个封装
    val adChickdata = kafkaDS.map {
      kafkaData => {
        val datas = kafkaData.value()
        val data = datas.split(" ")

        Ad_ChichDate(data(0), data(1), data(2), data(3), data(4))
      }
    }
    //最近一小时,每10s计算一次
    //12:12:01 ==>12:12:00
    //12:12:02 ==>12:12:00
    //12:12:04 ==>12:12:00
    //12: 12: 12 ==>12:12:10
    //进行聚合
    val reduceDs = adChickdata.map(
      data => {
        val ts1 = data.ts.toLong

        //时间戳是以毫秒为单位
        val newTS = ts1 / 10000 * 10000
        (newTS, 1)
      }
    ).reduceByKeyAndWindow((x: Int, y: Int) => {
      x + y
    }, Seconds(60), Seconds(10))
//    reduceDs.print()
    //对每个rdd进行操作
    reduceDs.foreachRDD(
      rdd=>{
        val list=new ListBuffer[String]()
        //这个数据是经过reduce之后得到的,它会有一个打乱重组的概念,所以是乱序的,需要进行依次排序
        val datas=rdd.sortByKey(ascending = true).collect()
        //把值放入json文件中
        datas.foreach{
          case(time,cnt)=>{
            val sdf=new SimpleDateFormat("hh:mm:ss")
            val ts=sdf.format(time)
            list.append(s"""{"xtime":"${ts}","yval":"${cnt}"}""")
          }
        }
        //输出到文件
        val out=new PrintWriter(new FileWriter(new File("datas/adclick/adclick.json")))

        out.println("["+list.mkString(",")+"]")
        out.flush()
        out.close()
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  case class Ad_ChichDate(ts: String, area: String, city: String, userid: String, adid: String)

}

spark streaming案例实操

原文:https://www.cnblogs.com/life-oss/p/14805751.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!