只有那些被提交到kafka的数据,也就是那些已经被写入所有同步副本的数据,对消费者是可用的,这意味着消费者得到的消息已经具备了一致性。消费者唯一要做的是跟踪哪些消息是已经读取过的,哪些是还没有读取过的。这是在读取消息时不丢失消息的关键。
在从分区读取数据时,消费者会获取一批事件,检查这批事件里最大的偏移量,然后从这个偏移量开始读取另外一批事件。这样可以保证消费者总能以正确的顺序获取新数据,不会错过任何事件。
如果一个消费者退出,另一个消费者需要知道从什么地方开始继续处理,它需要知道前一个消费者在退出前处理的最后一个偏移量是多少。它们把当前读取的偏移量保存起来,在退出之后,同一个群组里的其他消费者就可以接手它们的工作。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。在这种情况下,如果其他消费者接手了工作,那些没有被处理完的消息就会被忽略,永远得不到处理。
1.group.id:如果两个消费者具有相同的group.id,并且订阅了同一个主题,那么每个消费者会分到主题分区的一个子集,也就是说他们只能读到所有消息的一个子集(不过群组会读取主题所有的消息)。如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的group.id。
2.auto.offset.reset:这个参数指定了在没有偏移量可提交时或者请求的偏移量在broker上不存在时,消费者会做什么。这个参数有两种配置:
3.enable.auto.commit:这是一个非常重要的配置参数,可以让消费者基于任务调度自动提交偏移量,也可以在代码里手动提交偏移量。自动提交的一个最大好处是,在实现消费者逻辑时可以少考虑一些问题。如果在消费者轮询操作里处理所有的数据,那么自动提交可以保证只提交已经处理过的偏移量。自动提交的缺点:无法控制重复处理消息(比如消费者在自动提交偏移量之前停止处理消息),而且如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。
4.auto.commit.interval.ms:此参数与enable.auto.commit有直接的联系,如果选择了自动提交偏移量,可以通过此参数配置提交的频度,默认值是每5秒钟提交一次。一般来说,频繁提交会增加额外的开销,但也会降低重复处理消息的概率。
1.总是在处理完事件后再提交偏移量
如果所有的处理都是在轮询里完成,并且不需要在轮询之间维护状态(比如为了实现聚合操作),那么可以使用自动提交,或者在轮询结束时进行手动提交。
2.提交频度是性能和重复消息数量之间的权衡
即使是在最简单的场景里,比如所有的处理都在轮询里完成,并且不需要在轮询之间维护状态,仍然可以再一个循环里多次提交偏移量(甚至可以在每处理完一个事件之后),或者多个循环里只提交一次(与生产者的acks=all配置有点类似),这完全取决于在性能和重复处理消息之间做出的权衡。
3.确保对提交的偏移量心里有数
在轮询过程中提交偏移量有一个不好的地方,就是提交的偏移量有可能是读取到的最新偏移量,而不是处理过的最新偏移量。要记住,在处理完消息后再提交偏移量时非常关键的--否则会导致消费者错过消息。
4.再均衡
在设计应用程序时要注意处理消费者的再均衡问题。一般要在分区被撤销之前提交偏移量,并在分配到新分区时清理之前的状态。
5.消费者可能需要重试
有时候,在进行轮询之后,有些消息不会被完全处理,想稍后再来处理。例如,假设要把kafka的数据写到数据库里,不过那个时候数据库不可用,于是想稍后重试。要注意,提交的是偏移量,而不是对消息的确认,这个与传统的发布可订阅消息系统不太一样。如果记录#30处理失败,但记录#31处理成功,那么不应该提交#31,否则会导致#31以内的偏移量都被提交,包括#30在内,而这可能不是想看到的结果。可以采用以下两种模式来解决这个问题。
原文:https://www.cnblogs.com/EnzoDin/p/13348629.html