首页 > 其他 > 详细

Flink Kafka作为Source和Sink

时间:2020-04-21 00:26:41      阅读:151      评论:0      收藏:0      [点我收藏+]

实现kafka进,kafka出的流程。

代码:

object KafkaTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //kafka配置文件
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //接收kafka的sensor这个topic发来的数据
    val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))

    val dataStream: DataStream[String] = kafkaDataStream.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble).toString
    })

    //发送到kafka的flink-sink-test这个topic
    dataStream.addSink(new FlinkKafkaProducer011[String]("hadoop102:9092","flink-sink-test",new SimpleStringSchema()))

    env.execute("kafka")
  }

}

生产者发送数据,供flink消费

[atguigu@hadoop102 bin]$ ./kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
>sensor_1, 1547718199, 35.80018327300259
>sensor_1, 1547718201, 40.8
>sensor_1, 1547718202, 998                 
>

消费者查看flink发回的数据

[atguigu@hadoop102 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic flink-sink-test
SensorReading(sensor_1,1547718199,35.80018327300259)
SensorReading(sensor_1,1547718201,40.8)
SensorReading(sensor_1,1547718202,998.0)

  

Flink Kafka作为Source和Sink

原文:https://www.cnblogs.com/noyouth/p/12741509.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!