SparkStreaming在处理kafka中的数据时,存在一个kafka offset的管理问题:
但是checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,或者更新迭代新功能了,这个时候,你先停旧的sparkstreaming程序,然后新的程序打包编译后执行运行,会出现两种情况:
为什么会出现上面的两种情况?
针对这种问题,spark官网给出了2种解决办法:
(1)旧的不停机,新的程序继续启动,两个程序并存一段时间消费。 评价:仍然有丢重复消费的可能
(2)停机的时候,记录下最后一次的偏移量,然后新恢复的程序读取这个偏移量继续工作,从而达到不丢消息。 评价:官网没有给出具体怎么操作,只是给了个思路:自己存储offsets,
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
#Java
// Th#e details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
思路就在这段伪代码中:数据存储支持事务,在事务中更新结果和偏移量,确认偏移量正确更新。
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
SparkStreaming管理kafka中offsets,就是将offsets采用某种数据格式存储在某个地方,一般有如下几种方式:
Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API(异步提交 API)来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets
将offsets提交到Kafka中
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
注意: commitAsync()是Spark Streaming集成kafka-0-10版本中的,在Spark文档提醒到它仍然是个实验性质的API并且存在修改的可能性。
kafka消费者的偏移量本身就是存储在zookeeper中,在sparkstreaming中,需要在启动时,显示的指定从zookeeper中读取偏移量即可,参考代码如下:
step1: 初始化Zookeeper connection来从Zookeeper中获取offsets
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.
def readOffsets(topics: Seq[String], groupId:String):
Map[TopicPartition, Long] = {
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)
// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception =>
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
}
})
})
topicPartOffsetMap.toMap
}
step2: 使用获取到的offsets来初始化Kafka Direct DStream
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
#注意: Kafka offset在ZooKeeper中的存储路径为/consumers/[groupId]/offsets/topic/[partitionId], 存储的值为offset
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val acls = new ListBuffer[ACL]()
val acl = new ACL
acl.setId(ANYONE_ID_UNSAFE)
acl.setPerms(PERMISSIONS_ALL)
acls += acl
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
}
create ‘stream_kafka_offsets‘, {NAME=>‘offsets‘, TTL=>2592000}
row: <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family: offsets
qualifier: <PARTITION_ID>
value: <OFFSET_ID>
For each batch of messages, saveOffsets() function is used to persist last read offsets for a given kafka topic in HBase.对每一个批次的消息,使用saveOffsets()将从指定topic中读取的offsets保存到HBase中
/*
Save offsets for each batch into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
val put = new Put(rowKey.getBytes)
for(offset <- offsetRanges){
put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
Bytes.toBytes(offset.untilOffset.toString))
}
table.put(put)
conn.close()
}
在执行streaming任务之前,首先会使用getLastCommittedOffsets()来从HBase中读取上一次任务结束时所保存的offsets。该方法将采用常用方案来返回kafka topic分区offsets。
情形1:Streaming任务第一次启动,从zookeeper中获取给定topic的分区数,然后将每个分区的offset都设置为0,并返回。
情形2:一个运行了很长时间的streaming任务停止并且给定的topic增加了新的分区,处理方式是从zookeeper中获取给定topic的分区数,对于所有老的分区,offset依然使用HBase中所保存,对于新的分区则将offset设置为0。
情形3:Streaming任务长时间运行后停止并且topic分区没有任何变化,在这个情形下,直接使用HBase中所保存的offset即可。
在Spark Streaming应用启动之后如果topic增加了新的分区,那么应用只能读取到老的分区中的数据,新的是读取不到的。所以如果想读取新的分区中的数据,那么就得重新启动Spark Streaming应用。
/* Returns last committed offsets for all the partitions of a given topic from HBase in
following cases.
*/
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
val hbaseConf = HBaseConfiguration.create()
val zkUrl = zkQuorum+"/"+zkRootDir
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
sessionTimeout,connectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
)).get(TOPIC_NAME).toList.head.size
zkClientAndConnection._1.close()
zkClientAndConnection._2.close()
//Connect to HBase to retrieve last committed offsets
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
String.valueOf(System.currentTimeMillis())
val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
val scan = new Scan()
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
stopRow.getBytes).setReversed(true))
val result = scanner.next()
var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
if (result != null){
//If the result from hbase scanner is not null, set number of partitions from hbase
to the number of cells
hbaseNumberOfPartitionsForTopic = result.listCells().size()
}
val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
if(hbaseNumberOfPartitionsForTopic == 0){
// initialize fromOffsets to beginning
for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
// handle scenario where new partitions have been added to existing kafka topic
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else {
//initialize fromOffsets from last run
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
}
scanner.close()
conn.close()
fromOffsets.toMap
}
当我们获取到offsets之后我们就可以创建一个Kafka Direct DStream
val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
在完成本批次的数据处理之后调用saveOffsets()保存offsets.
/*
For each RDD in a DStream apply a map transformation that processes the message.
*/
inputDStream.foreachRDD((rdd,batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
offset.untilOffset))
val newRDD = rdd.map(message => processMessage(message))
newRDD.count()
saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
})
参考代码:https://github.com/gdtm86/spark-streaming-kafka-cdh511-testing
综上所述,推荐使用zk维护offsets。
tips:本文属于自己学习和实践过程的记录,很多图和文字都粘贴自网上文章,没有注明引用请包涵!如有任何问题请留言或邮件通知,我会及时回复。
SparkStreaming使用checkpoint存在的问题及解决方案
原文:https://www.cnblogs.com/small-k/p/8909942.html