import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark streaming 整合 kafka */ object KafkaDirectStream { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, Object]( /* * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。 * 不过建议至少提供两个broker的信息作为容错。 */ "bootstrap.servers" -> "test:9092", /*键的序列化器*/ "key.deserializer" -> classOf[StringDeserializer], /*值的序列化器*/ "value.deserializer" -> classOf[StringDeserializer], /*消费者所在分组的ID*/ "group.id" -> "spark-streaming-group", /* * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录 */ "auto.offset.reset" -> "latest", /*是否自动提交*/ "enable.auto.commit" -> (true: java.lang.Boolean) ) /*可以同时订阅多个主题*/ val topics = Array("spark-streaming-topic") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, /*位置策略*/ PreferConsistent, /*订阅主题*/ Subscribe[String, String](topics, kafkaParams) ) /*打印输入流*/ stream.map(record => (record.key, record.value)).print() streamingContext.start() streamingContext.awaitTermination() } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.produce</groupId> <artifactId>produce</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <!-- Spark --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <!-- Spark Streaming --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
原文:https://www.cnblogs.com/tangsonghuai/p/11204247.html