首页 > 其他 > 详细

spark-stream 读取kafka

时间:2020-06-17 10:15:26      阅读:60      评论:0      收藏:0      [点我收藏+]

spark和kafka之间版本差异挺大的,记录一下版本组合的存取方式

软件环境

  • spark-2.3.2
  • kafka_2.11-0.11.0.2

依赖jar包

  • jar:spark-streaming-kafka-0-10_2.11
  • sbt:libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.3.0"

kafka-0-10_2.11中0-10代表kafka的版本号,2.11代表依赖的scala版本,官方给的示例说比0-10高的版本就可以使用,使用了它给的示例0-10,后面的spark版本使用的是2.3.0,因为2.3.2的jar包没找到
kafka-0-0.8和kafka-0-10之间有差异,0.8采用createStream依赖于zookeeper,0.10采用createDirectStream不依赖于zookeeper,0-0.10和0-0.8的运行机制不一样,createDirectStream也更加高效

代码示例

package spark_streamers

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object spark_kafka {
  val topic = "wordsender"

  def client(): KafkaProducer[String, String] = {
    val brokers = "192.168.0.103:9092,192.168.0.104:9092"
    val kafka_ss = "org.apache.kafka.common.serialization.StringSerializer"
    val probs = new java.util.HashMap[String, Object]()
    probs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    probs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafka_ss)
    probs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafka_ss)
    new KafkaProducer[String, String](probs)
  }

  def send_message(value: String) {
    val messagesPerSec = 3
    val wordsPerMessage = 5
    val producer = client()
    while (true) {
      (1 to messagesPerSec).foreach({
        _ => {
          val str = (1 to wordsPerMessage).map(_ => scala.util.Random.nextInt(10).toString).mkString(" ")
          println(str)
          val message = new ProducerRecord[String, String](topic, null, str)
          producer.send(message)
        }
          Thread.sleep(1000)
      })
    }
  }

  def get_message(): Unit = {

    val zkQuorum = "192.168.0.103:9092"
    val group = "1"
    val conf = new SparkConf().setMaster("local[2]").setAppName("streamkafka")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.checkpoint("/spark/sparkstream/checkpoint")
    val kafkaParams = mutable.HashMap[String, Object](
      "bootstrap.servers" -> zkQuorum,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> group,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = topic.split(",").toSet
    val lineMap = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
//    val pari = lineMap.map(record => (record.key, record.value)).flatMap(_._1).map((_, 1))
//    val wordCounts = pari.reduceByKeyAndWindow(_ + _, _ - _, Minutes(2), Seconds(10), 2)
//    wordCounts.print
    val lines = lineMap.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start
    ssc.awaitTermination
  }

  def main(args: Array[String]): Unit = {
    get_message()
  }
}

spark-stream 读取kafka

原文:https://www.cnblogs.com/Cshare66/p/13150665.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!