1 package com.it.baizhan.scalacode.Streaming 2 3 import java.util 4 5 import org.apache.kafka.clients.consumer.ConsumerRecord 6 import org.apache.kafka.common.TopicPartition 7 import org.apache.kafka.common.serialization.StringDeserializer 8 import org.apache.spark.rdd.RDD 9 import org.apache.spark.streaming.dstream.InputDStream 10 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 11 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange} 12 import org.apache.spark.streaming.{Durations, StreamingContext} 13 import org.apache.spark.{SparkConf, TaskContext} 14 15 import scala.collection.mutable 16 17 /** 18 * 利用redis 来维护消费者偏移量 19 */ 20 object ManageOffsetUseRedis { 21 def main(args: Array[String]): Unit = { 22 val conf = new SparkConf() 23 conf.setMaster("local") 24 conf.setAppName("manageoffsetuseredis") 25 //设置每个分区每秒读取多少条数据 26 conf.set("spark.streaming.kafka.maxRatePerPartition","10") 27 val ssc = new StreamingContext(conf,Durations.seconds(5)) 28 //设置日志级别 29 ssc.sparkContext.setLogLevel("Error") 30 31 val topic = "mytopic" 32 /** 33 * 从Redis 中获取消费者offset 34 */ 35 val dbIndex = 3 36 37 //从Redis中获取存储的消费者offset 38 val currentTopicOffset: mutable.Map[String, String] = getOffSetFromRedis(dbIndex,topic) 39 40 //初始读取到的topic offset: 41 currentTopicOffset.foreach(tp=>{println(s" 初始读取到的offset: $tp")}) 42 43 //转换成需要的类型 44 val fromOffsets: Predef.Map[TopicPartition, Long] = currentTopicOffset.map { resultSet => 45 new TopicPartition(topic, resultSet._1.toInt) -> resultSet._2.toLong 46 }.toMap 47 48 val kafkaParams = Map[String, Object]( 49 "bootstrap.servers" -> "mynode1:9092,mynode2:9092,mynode3:9092", 50 "key.deserializer" -> classOf[StringDeserializer], 51 "value.deserializer" -> classOf[StringDeserializer], 52 "group.id" -> "MyGroupId11", 53 "auto.offset.reset" -> "latest", 54 "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true 55 ) 56 57 /** 58 * 将获取到的消费者offset 传递给SparkStreaming 59 */ 60 val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( 61 ssc, 62 PreferConsistent, 63 ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) 64 ) 65 stream.foreachRDD { (rdd:RDD[ConsumerRecord[String, String]]) => 66 67 println("**** 业务处理完成 ****") 68 69 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 70 71 rdd.foreachPartition { iter => 72 val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 73 println(s"topic:${o.topic} partition:${o.partition} fromOffset:${o.fromOffset} untilOffset: ${o.untilOffset}") 74 } 75 76 //将当前批次最后的所有分区offsets 保存到 Redis中 77 saveOffsetToRedis(dbIndex,offsetRanges) 78 } 79 80 ssc.start() 81 ssc.awaitTermination() 82 ssc.stop() 83 84 85 } 86 87 /** 88 * 将消费者offset 保存到 Redis中 89 * 90 */ 91 def saveOffsetToRedis(db:Int,offsetRanges:Array[OffsetRange]) = { 92 val jedis = RedisClient.pool.getResource 93 jedis.select(db) 94 offsetRanges.foreach(offsetRange=>{ 95 jedis.hset(offsetRange.topic, offsetRange.partition.toString,offsetRange.untilOffset.toString) 96 }) 97 println("保存成功") 98 RedisClient.pool.returnResource(jedis) 99 } 100 101 102 /** 103 * 从Redis中获取保存的消费者offset 104 * @param db 105 * @param topic 106 * @return 107 */ 108 def getOffSetFromRedis(db:Int,topic:String) ={ 109 val jedis = RedisClient.pool.getResource 110 jedis.select(db) 111 val result: util.Map[String, String] = jedis.hgetAll(topic) 112 RedisClient.pool.returnResource(jedis) 113 if(result.size()==0){ 114 result.put("0","0") 115 result.put("1","0") 116 result.put("2","0") 117 } 118 import scala.collection.JavaConversions.mapAsScalaMap 119 val offsetMap: scala.collection.mutable.Map[String, String] = result 120 offsetMap 121 } 122 }
原文:https://www.cnblogs.com/zhouaimin/p/14680833.html