首页 > 其他 > 详细

Flink学习(十) Sink到Redis

时间:2020-05-20 00:22:57      阅读:145      评论:0      收藏:0      [点我收藏+]

添加依赖

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

编写代码

package com.wyh.streamingApi.sink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object Sink2Redis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //Source操作
    val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt")

    //Transform操作
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

    val conf = new FlinkJedisPoolConfig.Builder()
        .setHost("localhost")
        .setPort(6379)
        .build()



    //Sink操作
    dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))


    env.execute("redis sink test")
  }

}

class MyRedisMapper() extends RedisMapper[SensorReading]{

  //定义保存数据到Redis的命令
  override def getCommandDescription: RedisCommandDescription = {
    //把传感器id和温度值保存成 Hash表 HSET key field value
    new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
  }

  //定义保存到redis的key
  override def getKeyFromData(t: SensorReading): String = {
    t.id
  }

  //定义保存到redis的value
  override def getValueFromData(t: SensorReading): String = {
    t.temperature.toString
  }
}

技术分享图片

 

Flink学习(十) Sink到Redis

原文:https://www.cnblogs.com/wyh-study/p/12920415.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!