import java.util import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object KafkaDricteRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("redis").setMaster("local[*]") val ssc = new StreamingContext(conf,new Duration(5000)) val groupid = "GB01" //组名 val topic = "topic_bc"//topic 名 //在redis中以 groupid/topic作为唯一标识 ,存储分区偏移量 //在Reids 使用的时hash类型来存储 val gtKey = groupid+"/"+topic //topic val topics = Set(topic) //zk地址 val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //brokerList val brokerList = "hadoop04:9092,hadoop05:9092,hadoop06:9092" val kafkaParams = Map( // metadata.broker.list "metadata.broker.list"->brokerList, "group.id"->groupid, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString //从头开始消费 ) //记录topic 、分区对应的偏移量偏移量,在创建InputDStream时作为参数传如 //从这个偏移量开始读取 var fromOffset = Map[TopicAndPartition,Long]() var kafkaDStream :InputDStream[(String,String)] = null // 获取一个jedis连接 val conn = getConnection() // conn.flushDB() //jd.hget(groupid+topic,"") //获取全部的keys val values: util.Set[String] = conn.keys("*") //println(values) // [GB01/wordcount3] 分区数 偏移量 //如果keys中包含 GB01/wordcount3这样的key,则表示以前读取过 if(values.contains(gtKey)){ //获取key 为GB01/wordcount3 下面所对应的(k,v) /** conn.hgetAll(gtKey) GB01/wordcount3: * 1 888 * 2 888 * 3 888 * 4 888 */ var allKey: util.Map[String, String] = conn.hgetAll(gtKey) //导入后,可以把Java中的集合转换为Scala中的集合 import scala.collection.JavaConversions._ var list: List[(String, String)] = allKey.toList //循环得到的(k,v) //这里面的 k 对应的是分区, v对应的是偏移量 for (key <- list){ //这里的key是一个tuple类型 //new一个TopicAndPartition 把 topic 和分区数传入 val tp = new TopicAndPartition(topic,key._1.toInt) //把每个topic 分区 对应的偏移量传入 fromOffset += tp -> key._2.toLong println("分区"+key._1+"偏移量为"+key._2) } //这里的是把数据(key ,value)是kafka 的key默认是null, //value 是kafka中的value val messageHandler =(mmd:MessageAndMetadata[String,String])=>{ ( mmd.key(),mmd.message()) } //创建一个InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams,fromOffset,messageHandler) }else{ //如果以前没有读取过,创建一个新的InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topics ) } //用来更新偏移量,OffsetRange中可以获取分区及偏移量 var OffsetRangs = Array[OffsetRange]() // kafkaDStream.foreachRDD(kafkaRDD=> { //这里面的RDD是kafkaRDD ,可以转换为HasOffsetRange val ranges = kafkaRDD.asInstanceOf[HasOffsetRanges] // 获取分区信息的集合 OffsetRangs = ranges.offsetRanges //获取value,(key 默认是null,没有用) val map: RDD[String] = kafkaRDD.map(_._2) map.foreach(x=>print("")) //更新偏移量 for (o <- OffsetRangs){ //取出偏移量 val offset = o.untilOffset //取出分区 val partition = o.partition println("partition: "+partition) println("offset: "+offset) //把通过hset,把对应的partition和offset写入到redis中 conn.hset(gtKey,partition.toString,offset.toString) } }) ssc.start() ssc.awaitTermination() } //Jedis连接池 def getConnection(): Jedis ={ //new 一个JedisPoolConfig,用来设定参数 val conf = new JedisPoolConfig() val pool = new JedisPool(conf,"192.168.121.12",6379) //最大连接数 conf.setMaxTotal(20) //最大空闲数 conf.setMaxIdle(20) val jedis = pool.getResource() //密码 jedis.auth("test123") jedis }
sparkStreaming 与fafka直接方式 进行消费者偏移量的保存如redis 里面 避免代码改变与节点重启后的数据丢失与序列化问题
原文:https://www.cnblogs.com/hejunhong/p/10493411.html