代码思路逻辑及分析
1.首先获取kafka中的数据
2.然后对kafka中的数据进行一个封装
3.进行聚合操作
val reduceDs = adChickdata.map(
data => {
val ts1 = data.ts.toLong
//时间戳是以毫秒为单位
val newTS = ts1 / 10000 * 10000
(newTS, 1)
}
).reduceByKeyAndWindow((x: Int, y: Int) => {
x + y
}, Seconds(60), Seconds(10))
4.对每个rdd进行操作
reduceDs.foreachRDD(
rdd => {
val list = new ListBuffer[String]()
//这个数据是经过reduce之后得到的,它会有一个打乱重组的概念,所以是乱序的,需要进行依次排序
val datas = rdd.sortByKey(ascending = true).collect()
//把值放入json文件中
datas.foreach {
case (time, cnt) => {
//格式化
val sdf = new SimpleDateFormat("hh:mm:ss")
val ts = sdf.format(time)
list.append(s"""{"xtime":"${ts}","yval":"${cnt}"}""")
}
}
//输出到文件
val out = new PrintWriter(new FileWriter(new File("datas/adclick/adclick.json")))
out.println("[" + list.mkString(",") + "]")
out.flush()
out.close()
}
)
代码
package com.project
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer
/**
* @ObjectName req3
* @Description TODO 求最后一小时广告点击量
* @Author
* @Date 2021/5/24 8:38
* @Version 1.0
* */
object req3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("req3")
val ssc = new StreamingContext(conf, Seconds(10))
// kafka 参数
//kafka参数声明
val brokers = "master:9092,slave1:9092,slave2:9092"
val topic = "first"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val topics = Array(topic)
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
//TODO 获取kafka中的数据
val kafkaDS = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
topics, kafkaParams
)
)
//TODO 对kafka中数据进行一个封装
val adChickdata = kafkaDS.map {
kafkaData => {
val datas = kafkaData.value()
val data = datas.split(" ")
Ad_ChichDate(data(0), data(1), data(2), data(3), data(4))
}
}
//最近一小时,每10s计算一次
//12:12:01 ==>12:12:00
//12:12:02 ==>12:12:00
//12:12:04 ==>12:12:00
//12: 12: 12 ==>12:12:10
//进行聚合
val reduceDs = adChickdata.map(
data => {
val ts1 = data.ts.toLong
//时间戳是以毫秒为单位
val newTS = ts1 / 10000 * 10000
(newTS, 1)
}
).reduceByKeyAndWindow((x: Int, y: Int) => {
x + y
}, Seconds(60), Seconds(10))
// reduceDs.print()
//对每个rdd进行操作
reduceDs.foreachRDD(
rdd=>{
val list=new ListBuffer[String]()
//这个数据是经过reduce之后得到的,它会有一个打乱重组的概念,所以是乱序的,需要进行依次排序
val datas=rdd.sortByKey(ascending = true).collect()
//把值放入json文件中
datas.foreach{
case(time,cnt)=>{
val sdf=new SimpleDateFormat("hh:mm:ss")
val ts=sdf.format(time)
list.append(s"""{"xtime":"${ts}","yval":"${cnt}"}""")
}
}
//输出到文件
val out=new PrintWriter(new FileWriter(new File("datas/adclick/adclick.json")))
out.println("["+list.mkString(",")+"]")
out.flush()
out.close()
}
)
ssc.start()
ssc.awaitTermination()
}
case class Ad_ChichDate(ts: String, area: String, city: String, userid: String, adid: String)
}
原文:https://www.cnblogs.com/life-oss/p/14805751.html