Kafka 消息以 Partition 作为存储单元,那么在 Partition 内消息是以什么样的格式存储的呢,如何处理 Partition 中的消息,又有哪些安全策略来保证消息不会丢失呢,这一篇我们一起看看这些问题。
每个 Topic 的消息被一个或者多个 Partition 进行管理,Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。一个 Partition 不能被切分成多个散落在多个 broker 上或者多个磁盘上。
它作为消息管理名义上最大的管家内里其实是由很多的 Segment 文件组成。如果一个 Partition 是一个单个非常长的文件的话,那么这个查找操作会非常慢并且容易出错。为解决这个问题,Partition 又被划分成多个 Segment 来组织数据。Segment 并不是终极存储,在它的下面还有两个组成部分:
Segment 文件的命名规则是: 某个 Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名以当前 Partition 的最大 offset(消息偏移量)为基准,文件名长度为 64 位 long 类型,19 位数字字符长度,不足部分用 0 填充。
每一条消息的组成内容有如下字段:
offset: 4964(逻辑偏移量)
position: 75088(物理偏移量)
CreateTime: 1545203239308(创建时间)
isvalid: true(是否有效)
keysize: -1(键大小)
valuesize: 9(值大小)
magic: 2
compresscodec: NONE(压缩编码)
producerId: -1
producerEpoch: -1(epoch号)
sequence: -1(序号)
isTransactional: false(是否事务)
headerKeys: []
payload: message_0(消息的具体内容)
为什么要设计 Partition 和 Segment 的存储机制
Partition 是对外名义上的数据存储,用户理解数据都是顺序存储到 Partition 中。那么实际在 Partition 内又多了一套不对用户可见的 Segment 机制是为什么呢?原因有两个:
提起高可用我们大概猜到要做副本机制,多弄几个备份肯定好。Kafka 也不例外提供了副本的概念(Replica),通过副本机制来实现冗余备份。每个 Partition 可以设置多个副本,在副本集合中会存在一个 leader 的概念,所有的读写请求都是由 leader 来进行处理。剩余副本都做为 follower,follower 会从 leader 同步消息日志 。
常用的节点选举算法有 Raft 、Paxos、 Bully 等,根据业务的特点 Kafka 并没有完全套用这些算法,首先有如下概念:
每个 Partition 都有唯一一个预写日志(write-ahead log),Producer 写入的消息会先存入这里。每一条消息都有唯一一个偏移量 offset,如果这条消息带有 key, 就会根据 key hash 值进行路由到对应的 Partition,如果没有指定 key 则根据随机算法路由到一个 Partition。
一个 Topic 的某个 Partition 如果有多副本机制存在,正常情况下只能有一个 副本是对外提供读写服务的,其余副本从它这里同步数据。那么这个对外提供服务的 leader 是如何选举出来的呢?这个问题要分为两种情况,一种是 Kafka 首次启动的选举,另一种是启动后遇到故障或者增删副本之后的选举。
当 broker 启动后所有的 broker 都会去 zk 注册,这时候第一个在 zk 注册成功的 broker 会成为 leader,其余的都是 follower,这个 broker leader 后续去执行 Partition leader 的选举。
首先会从 zk 中读取 Topic 每个分区的 ISR;
然后调用配置的分区选择算法来选择分区 leader,这些算法有不同的使用场景,broker 启动,znode 发生变化,新产生节点,发生 rebalance 的时候等等。通过算法选定一个分区作为 leader就确定了首次启动选举。
比如分区发生重分配的时候也会执行 leader 的选举操作。这种情况会从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。
如果某个节点被优雅地关闭(也就是执行 ControlledShutdown )时,位于这个节点上的 leader 副本都会下线,所以与此对应的分区需要执行 leader 的选举。这里的具体操作为:从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。
一旦 Partition 的 leader 确定后续的写消息都会向这个副本请求操作,其余副本都会同步它的数据。上面我们提到过几个概念:AR 、ISR、 OSR,在副本同步的过程中会应用到这几个队列。
首先 ISR 队列肯定包含当前的 leader 副本,也可能只有 leader 副本。什么情况下其余副本能够进入到 ISR 队列呢?
Kafka 提供了一个参数设置:rerplica.lag.time.max.ms=10000,这个参数表示 leader 副本能够落后 flower 副本的最长时间间隔,当前默认值是 10 秒。就是说如果 leader 发现 flower 超过 10 秒没有向它发起 fetch 请求,那么 leader 就认为这个 flower 出了问题。如果 fetch 正常 leader 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
例如上图中的两个 follower 明显慢于 leader,但是如果落后的时间在10 秒内,那么这三个副本都会在 ISR 中存在,否则,落后的副本会被剔除并加入到 OSR。
当然如果后面 follower 逐渐追上了 leader 的进度,那么该 follower 还是会被加入到 ISR,所以 ISR 并不是一个固定不变的集合,它是会动态调整的。
leader 和 follower 之间的数据同步过程大概如下:
初始状态下 leader 和 follower 的 HW 和 LEO 都是 0,follower 会不断地向 leader 发送请求 fetch 数据。但是因为没有数据,这个请求会被 leader 强制拖住,直到到达我们配置的 replica.fetch.wait.max.ms 时间之后才会被释放。同时如果在这段时间内有数据产生则直接返回数据。
Producer 向某个 Topic 推过来一条消息,当前 Topic 的 leader Partition 进行相应,那么如果其余 follower 没有同步成功消息会怎么样呢?这个问题 Kafka 交给用户来决定。
producer 提供了如下配置:
request.required.asks=0
可以看到以上确认机制配置逐级严格,生产环境综合考虑一般选择配置 = 1,如果你的业务对数据完整性要求比较高且可以接收数据处理速度稍慢那么选择 = 2。
某个消费组消费 partition 需要保存 offset 记录当前消费位置,0.10 之前的版本是把 offset 保存到 zk 中,但是 zk 的写性能不是很好,Kafka 采用的方案是 consumer 每分钟上报一次,这样就造成了重复消费的可能。
0.10 版本之后 Kafka 就 offset 的保存从 zk 剥离,保存到一个名为 consumer_offsets 的 Topic 中。消息的 key 由 [groupid、topic、partition] 组成,value 是偏移量 offset。Topic 配置的清理策略是compact。总是保留最新的 key,其余删掉。一般情况下,每个 key 的 offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存第一次就会遍历 Partition 建立缓存然后查询返回。
原文:https://www.cnblogs.com/rickiyang/p/14649750.html