首页 > 其他 > 详细

获取kafka中的偏移

时间:2017-10-31 13:40:12      阅读:329      评论:0      收藏:0      [点我收藏+]
kafka0.10.0.0的版本:
for (i <- 0 to (offsetList.length - 1)) {
var topicPartition = new TopicPartition(topic, i)
topicArray.add(topicPartition)

kafkaConsumer.assign(util.Arrays.asList(topicPartition))
kafkaConsumer.seekToEnd(util.Arrays.asList(topicPartition))
val latestOffset = kafkaConsumer.position(topicPartition)
logInfo("partition"+i+"latestOffset"+latestOffset)
endOffsetMap.put(topicPartition, latestOffset)
}

kafka:0.11.0.0版本:
def test() = {
var endOffsetMap1 = kafkaConsumer.endOffsets(topicArray)
import scala.collection.JavaConversions._
for (entry <- endOffsetMap) {
var key: TopicPartition = entry._1
logInfo("test -----topic:"+this.topic +" partition:" + key.partition() +" endOffset:" + entry._2)
}

}

获取kafka中的偏移

原文:http://www.cnblogs.com/loveItLoveFaimly/p/7760856.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!