spark和kafka之间版本差异挺大的,记录一下版本组合的存取方式
kafka-0-10_2.11中0-10代表kafka的版本号,2.11代表依赖的scala版本,官方给的示例说比0-10高的版本就可以使用,使用了它给的示例0-10,后面的spark版本使用的是2.3.0,因为2.3.2的jar包没找到
kafka-0-0.8和kafka-0-10之间有差异,0.8采用createStream依赖于zookeeper,0.10采用createDirectStream不依赖于zookeeper,0-0.10和0-0.8的运行机制不一样,createDirectStream也更加高效
package spark_streamers
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object spark_kafka {
val topic = "wordsender"
def client(): KafkaProducer[String, String] = {
val brokers = "192.168.0.103:9092,192.168.0.104:9092"
val kafka_ss = "org.apache.kafka.common.serialization.StringSerializer"
val probs = new java.util.HashMap[String, Object]()
probs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
probs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafka_ss)
probs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafka_ss)
new KafkaProducer[String, String](probs)
}
def send_message(value: String) {
val messagesPerSec = 3
val wordsPerMessage = 5
val producer = client()
while (true) {
(1 to messagesPerSec).foreach({
_ => {
val str = (1 to wordsPerMessage).map(_ => scala.util.Random.nextInt(10).toString).mkString(" ")
println(str)
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
})
}
}
def get_message(): Unit = {
val zkQuorum = "192.168.0.103:9092"
val group = "1"
val conf = new SparkConf().setMaster("local[2]").setAppName("streamkafka")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("/spark/sparkstream/checkpoint")
val kafkaParams = mutable.HashMap[String, Object](
"bootstrap.servers" -> zkQuorum,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> group,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = topic.split(",").toSet
val lineMap = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// val pari = lineMap.map(record => (record.key, record.value)).flatMap(_._1).map((_, 1))
// val wordCounts = pari.reduceByKeyAndWindow(_ + _, _ - _, Minutes(2), Seconds(10), 2)
// wordCounts.print
val lines = lineMap.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
ssc.start
ssc.awaitTermination
}
def main(args: Array[String]): Unit = {
get_message()
}
}
原文:https://www.cnblogs.com/Cshare66/p/13150665.html