//新建一个kafka的工具类 object MyKafkaUtil { private val properties: Properties = PropertiesUtil.load("config.properties") val broker_list = properties.getProperty("kafka.broker.list") // kafka消费者配置 val kafkaParam = Map( "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用于标识这个消费者属于哪个消费团体 "group.id" -> "gmall_consumer_group", //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性 //可以使用这个配置,latest自动重置偏移量为最新的偏移量 "auto.offset.reset" -> "latest", //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据 //如果是false,会需要手动维护kafka偏移量 "enable.auto.commit" -> (true: java.lang.Boolean) ) // 创建DStream,返回接收到的输入数据 // LocationStrategies:根据给定的主题和集群地址创建consumer // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区 // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer // ConsumerStrategies.Subscribe:订阅一系列主题 def getKafkaStream(topic: String,ssc:StreamingContext): InputDStream[ConsumerRecord[String,String]]={ val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam)) dStream } } //消费kafka的数据 object RealtimeStartupApp { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("name") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc,Seconds(10)) val startupStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(kafka_topic,ssc) }
val properties = new Properties() // kafka消费者配置 properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //简历Flink环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设置时间类型为EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置并行度为1 env.setParallelism(1) val stream = env .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
SparkStreaming/Flink读取Kafka的数据
原文:https://www.cnblogs.com/datacan/p/11008224.html