首页 > 其他 > 详细

flink的Kafka数据源代码样例

时间:2019-11-04 22:19:44      阅读:123      评论:0      收藏:0      [点我收藏+]
 1 val properties = new Properties()
 2 properties.setProperty("bootstrap.servers", "localhost:9092")
 3 properties.setProperty("group.id", "consumer-group")
 4 properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
 5 properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
 6 properties.setProperty("auto.offset.reset", "latest")
 7 val env = StreamExecutionEnvironment.getExecutionEnvironment
 8 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 9 env.setParallelism(1)
10 val stream = env
11   // source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
12   .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))

 

flink的Kafka数据源代码样例

原文:https://www.cnblogs.com/sunpengblog/p/11794497.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!