1. 自定义Sink写入hbase?
使用的是原生的hbase客户端,可以自己控制每多少条记录刷新一次。遇到了几个坑导致数据写不到hbase里边去:
例如protobuf-java版本冲突,常见的是两个关键错误,java.io.IOException: java.lang.reflect.InvocationTargetException 和 Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.protobuf.ProtobufUtil。
Flink读写Kafka,如果使用Consumer08的话,偏移量会提交Zk,下边这个配置可以写在Conf文件中,提交偏移量的Zk可以直接指定。Consumer09以后版本就不向Zk提交了,Kafka自己会单独搞一个Topic存储消费状态。
1 xxxx08 { 2 bootstrap.servers = "ip:9092" 3 zookeeper.connect = "ip1:2181,ip2/vio" 4 group.id = "group1" 5 auto.commit.enable = true 6 auto.commit.interval.ms = 30000 7 zookeeper.session.timeout.ms = 60000 8 zookeeper.connection.timeout.ms = 30000 9 }
1 final Properties consumerProps = ConfigUtil 2 .getProperties(config, “xxxx08");// 使用自己编写的Util函数读取配置 3 4 final FlinkKafkaConsumer08<String> source = 5 new FlinkKafkaConsumer08<String>(topic, new SimpleStringSchema(), consumerProps);
原文:https://www.cnblogs.com/lcmichelle/p/11204362.html