首页 > 其他 > 详细

Flink 从 0 到 1 学习之(25)Flink从redis中获取数据作为source源

时间:2020-09-08 16:50:25      阅读:135      评论:0      收藏:0      [点我收藏+]

redis中的数据:
技术分享图片
需要实现SourceFunction接口,指定泛型<>,也就是获取redis里的数据,处理完后的数据输入的数据类型 这里我们需要的是
(我们需要返回kv对的,就要考虑HashMap)
pom.xml

 <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.3</version>
            </dependency>

Java代码:

package ryx.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import javax.swing.plaf.TableHeaderUI;
import java.util.HashMap;
import java.util.Map;


/**
 *
 * 在redis中保存的有国家和大区的关系
 * hset  areas AREA_US US
 * hset  areas AREA_CT TW,HK
 * hset  areas AREA_AR PK,KW,SA
 * hset  areas AREA_IN IN
 *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning
 *
 * 我们需要返回kv对的,就要考虑HashMap
 */
public class MyRedisSource implements SourceFunction<HashMap<String,String>> {
    private Logger logger= LoggerFactory.getLogger(MyRedisSource.class);
    private boolean isRunning =true;
    private Jedis jedis=null;
    private final long SLEEP_MILLION=5000;
    public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
        this.jedis = new Jedis("hadoop01", 6379);
        HashMap<String, String> kVMap = new HashMap<String, String>();
        while(isRunning){
            try{
                kVMap.clear();
                Map<String, String> areas = jedis.hgetAll("areas");
                for(Map.Entry<String,String> entry:areas.entrySet()){
                    // key :大区 value:国家
                    String key = entry.getKey();
                    String value = entry.getValue();
                    String[] splits = value.split(",");
                    System.out.println("key:"+key+",--value:"+value);
                    for (String split:splits){
                        // key :国家value:大区
                        kVMap.put(split, key);
                    }
                }
                if(kVMap.size()>0){
                    ctx.collect(kVMap);
                }else {
                    logger.warn("从redis中获取的数据为空");
                }
                Thread.sleep(SLEEP_MILLION);
            }catch (JedisConnectionException e){
                logger.warn("redis连接异常,需要重新连接",e.getCause());
                jedis = new Jedis("hadoop01", 6379);
            }catch (Exception e){
                logger.warn(" source 数据源异常",e.getCause());
            }
        }
    }

    public void cancel() {
        isRunning=false;
        while(jedis!=null){
            jedis.close();
        }
    }
}

结果为:
key:AREA_US,–value:US
key:AREA_CT,–value:TW,HK
key:AREA_AR,–value:PK,KW,SA
key:AREA_IN,–value:IN

接着将value数据进行分割单个的单词,和key进行进行组合装到HashMap中,通过Run方法的SourceContext对象,作为Source源进行输出!

Flink 从 0 到 1 学习之(25)Flink从redis中获取数据作为source源

原文:https://www.cnblogs.com/huanghanyu/p/13633151.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!