生产者生产Record,其中包含key和value,key用于确定存放在哪个partition,value是真正的消息。
1. 指定了partition
2. 未指定partition,但指定了key,通过key的哈希值和partition的数量取模,计算索引
3. 都未指定,轮询
1. 生产者向broker集群提交连接请求,返回broker controller的地址(broker是zk的客户端,从zk中找到controller)
2. 生产者根据topic,向broker controller请求partition leader的地址
3. broker controller收到请求,从zk中查找指定topic下的所有partition leader地址,返回给生产者
4. 生产者根据路由策略,找到对应partition leader,发送消息
5. leader将消息写入本地log,并通知ISR中的followers
6. followers同步完消息返回ASK
7. leader收到ISR中所有followers返回的ASK后,增加HW,表示消费者可以消费到该位置了
8. 如果有follwer返回超时,则会将超时follower移动到OSR,再增加HW
HighWatermark,高水位。表示消费者可以消费到的最高偏移量。作用是保持broker正常运转下,partition leader和follower间数据的一致性。
LEO,Log End Offset,每写入一条消息,则加一。
这两者的区别是,HW位置对应的这条消息,必须保证ISR中follower都以同步完毕,这个由ISR中所有follower,同步最慢的那个决定的。而LEO,每个partition各自维护,代表它当前最大的消息偏移量。
所以呢,HW和ISR中同步最慢的follower的LEO一致。
当leader宕机下线,之后重新上线,其LEO需要回退到宕机时的HW,从这里开始重新从新leader同步,这样才能保证跟新leader之间数据的一致性。当然,此举会出现旧leader数据丢失的问题
通过acks参数进行设置
(a) 设置为0:类似于RocketMQ的单向发送,不关注broker的返回
(b) 设置为1:(默认)生产者等待broker的返回,leader收到消息,立刻返回ack,如生产者未收到,会进行重试。结合HW截断机制可知,可能会出现生产者收到ack但消息丢失的情况
(c) 设置为-1:生产者等待broker的返回,leader收到消息,等待ISR列表所有follower都同步完成后,才返回ack。生产者未收到ack同样会进行重试。会出现重复发送的问题,即follower未全部同步完,leader宕机未返回ack,生产者重新发送
1. 请求broker集群,找到broker controller
2. 根据topic向controller发送poll请求
3. controller向consumer分配一个或多个partition leader,并将它们的offset也发送给consumer
4. consumer从partition中按照offset开始消费
5. 消费完成后,向broker发送offset
6. broker更新到__consumer_offset和Coordinator缓存中
7. 消费者可以重置offset,灵活控制消费进度
通过unclean.leader.election.enable配置,用来控制,在ISR为空的情况下,是否从OSR中选举Leader。从OSR中选举的Leader,可能跟其他follower都无法通信,这就造成了单点问题
原因:
1. 同一个consumer重复消费
consumer消费能力低,消费超时,没能自动提交offset,则下次拉取时,会从原offset重新拉取。解决方案,减少一次拉取数量或增大消费超时时间,或者在代码中手动提交offset
2. 不同的consumer重复消费
consumer A消费完但尚未提交offset就宕机,rebalance,consumer B会重复消费A消费过的内容。可以通过每消费一条,手动提交offset,减少重复消费,但无法完全避免
原文:https://www.cnblogs.com/walker993/p/14801862.html