首页 > 其他 > 详细

spark直连kafka消费者

时间:2020-06-16 16:16:13      阅读:54      评论:0      收藏:0      [点我收藏+]
package com.nd.analysis


import java.util

import com.alibaba.fastjson. TypeReference
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.sql. SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaManager



case class three(rowguid: String, method: String, clientip: String)


object Consumer extends Logging {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("Consumer").master("local[*]").enableHiveSupport().getOrCreate()
val ssc = new StreamingContext(session.sparkContext, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9092,xxx.xx.xx.xx:9092",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "terminalExhaust",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest"
)

val kafkaManager = new KafkaManager(kafkaParams, false)

val kafkaData: InputDStream[ConsumerRecord[String, String]] = createDirectAndCheckOffset(ssc, true, kafkaManager)
val kDStream: DStream[String] = kafkaData.map(_.value())
val dStream: DStream[util.Map[String, Object]] = parseJsonToJMap(kDStream)

val value: DStream[util.Map[String, Object]] = parseJsonToJMap(kafkaData.map(_.value()))
value.foreachRDD(lStream=>{
lStream.foreach(line=>{
logInfo(s"${line.get("***")}")
})
})


ssc.start()
ssc.awaitTermination()
session.stop()

}


def createDirectAndCheckOffset(ssc: StreamingContext, bool: Boolean, manager: KafkaManager): InputDStream[ConsumerRecord[String, String]] = {

val topics = "ddTest3"

val kafkaData: InputDStream[ConsumerRecord[String, String]] = manager.createDirectStreamWithCheck[String, String](ssc, topics.split(",").toSet, bool)
kafkaData
}


protected def parseJsonToJMap(dStream: DStream[String]): DStream[java.util.Map[String, Object]] = {
dStream.map(json => {
var record: java.util.Map[String, Object] = null
try {
record = com.alibaba.fastjson.JSON.parseObject(json,
new TypeReference[java.util.Map[String, Object]]() {})
} catch {
case e: Exception => {
logError(s"解析json串失败:\n\t ${json}", e)
}
}
record
}).filter(_ != null)
}


}

spark直连kafka消费者

原文:https://www.cnblogs.com/shuaidong/p/13141155.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!