package com.nd.analysis
import java.util
import com.alibaba.fastjson. TypeReference
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.sql. SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaManager
case class three(rowguid: String, method: String, clientip: String)
object Consumer extends Logging {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("Consumer").master("local[*]").enableHiveSupport().getOrCreate()
val ssc = new StreamingContext(session.sparkContext, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9092",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "terminalExhaust",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest"
)
val kafkaManager = new KafkaManager(kafkaParams, false)
val kafkaData: InputDStream[ConsumerRecord[String, String]] = createDirectAndCheckOffset(ssc, true, kafkaManager)
val kDStream: DStream[String] = kafkaData.map(_.value())
val dStream: DStream[util.Map[String, Object]] = parseJsonToJMap(kDStream)
val value: DStream[util.Map[String, Object]] = parseJsonToJMap(kafkaData.map(_.value()))
value.foreachRDD(lStream=>{
lStream.foreach(line=>{
logInfo(s"${line.get("***")}")
})
})
ssc.start()
ssc.awaitTermination()
session.stop()
}
def createDirectAndCheckOffset(ssc: StreamingContext, bool: Boolean, manager: KafkaManager): InputDStream[ConsumerRecord[String, String]] = {
val topics = "ddTest3"
val kafkaData: InputDStream[ConsumerRecord[String, String]] = manager.createDirectStreamWithCheck[String, String](ssc, topics.split(",").toSet, bool)
kafkaData
}
protected def parseJsonToJMap(dStream: DStream[String]): DStream[java.util.Map[String, Object]] = {
dStream.map(json => {
var record: java.util.Map[String, Object] = null
try {
record = com.alibaba.fastjson.JSON.parseObject(json,
new TypeReference[java.util.Map[String, Object]]() {})
} catch {
case e: Exception => {
logError(s"解析json串失败:\n\t ${json}", e)
}
}
record
}).filter(_ != null)
}
}
原文:https://www.cnblogs.com/shuaidong/p/13141155.html