整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 (发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator,也称为消息收集器〉中。 Sender 线程负责从RecordAccumulator 中获取消息并将其发送到 Kafka 中 。
为保证生产者发送的数据,能可靠的发送到指定的topic,topic的每个partition收到生产者发送的数据后,都需要向生产者发送ack(确认收到),如果生产者收到ack,就会进行下一轮的发送,否则重新发送数据。ACK参数可设置的值为0、1、all。
当ACK=all时,Leader和follower(ISR)落盘才会返回ack,会有数据重复现象,如果在leader已经写完成,且follower同步完成,但是在返回ack的出现故障,则会出现数据重复现象;极限情况下,这个也会有数据丢失的情况,比如follower和leader通信都很慢,所以ISR中只有一个leader节点,这个时候,leader完成落盘,就会返回ack,如果此时leader故障后,就会导致丢失数据。
消费者( Consumer )负责订阅 Kafka 中的主题( Topic ),并且从订阅的主题上拉取消息。 在 Kafka 的消费理念中还有一层消费组( Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者 。 每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费 。
Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下,采用 Range Assignor 分配策略。 Kafka 还提供了另外两种分配策略: RoundRobinAssignor 和 StickyAssignor 。
Kafka中的消费是基于拉模式的,拉模式是消费者主动向服务端发起请求来拉取消息。 Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll()方法 ,而 poll()方法返回的是所订阅的主题(分区)上的一组消息。
/** * @see KafkaConsumer#poll(long) */ public ConsumerRecords<K, V> poll(long timeout);
timeout 的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将 timeout 设置为 0 , 这样 poll()方法会立刻返回,而不管是否己经拉取到了消息。如果应用线程唯一的工作就是从 Kafka 中拉取并消费消息,则可以将这个参数设置为最大值 Long.MAX_VALUE 。
对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置 。 对于消费者而言 ,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位 置。
在每次调用 poll()方法时,它返回的是还没有被消费过的消息集, 要做到这一点,就需要记录上一次消费时的消费位移 。 消费位移存储在 Kafka 内 部的主题 consumer offsets 中 。 这里把将消费位移存储起来(持久化)的动作称为提交 ,消费者在消费完消息之后需要执行消费位移的提交。
在Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。
每个consumer会定期将自己消费分区的offset提交给kafka内部topic: consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。因为consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式扩大并发。
rebalance是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障。rebalance发生期间,消费组内的消费者是无法读取消息的。当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。
如下情况可能会触发消费者rebalance
public interface ConsumerRebalanceListener{ // 在rebalance开始之前和消费者停止读取消息之后被调用。
// 可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。 // 参数 partitions表示rebalance前所分配到的分区。 void onPartitionsRevoked(Collection<TopicPartition> partitions); // 在重新分配分区之后和消费者开始读取消费之前被调用,参数partitions表示再均衡后所分配到的分区。 void onPartitionsAssigned(Collection<TopicPartition> partitions); }
每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在rebalance期间完成的。一共有如下几种情形会触发再均衡的操作:
当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。整个rebalance过程如下:
组协调器GroupCoordinator: 每个consumergroup都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。consumergroup中的每个consumer启动时会向kafka集群中的某个节点(负载最小的节点:InFlightRequests有记录)发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。
组协调器选择方式: 通过如下公式可以选出consumer消费的offset要提交到consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumergroup的coordinator公式: hash(consumergroupid)%consumer_offsets 主题的分区数
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumergroup中选择第一个加入group的consumer作为leader(消费组协调器),把consumergroup情况发送给这个leader,接着这个leader会负责制定分区方案。
选举分区分配策略,根据各个消费者呈报的分配策略选举,过程如下:
每个消费者都向GroupCoordinator发送JoinGroupRequest请求,其中携带了各自提案的分配策略和订阅信息。JoinGroupResonse回执中包含GroupCoordinator中投票选举出的分配策略的信息,并且只有leader消费者的回执中包含每个消费者的订阅信息。
consumerleader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leaderbroker进行网络连接以及消息消费。
KafkaConsumer是非线程安全的,KafkaConsumer中定义了一个acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常。
优先副本的选举:Kafka集群的一个broker中最多只能有它的一个副本。分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用,此时就需要Kafka从剩余的follower副本中挑选一个新的leader副本来继续对外提供服务。为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferredreplica)的概念。所谓的优先副本是指在AR集合列表中的第一个副本。优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 m --topic my-replicated-topic2 //查看分区信息
结果如下所示(包含了部分Broker下线以及恢复的过程):
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //查看分区信息 Topic: my-replicated-topic PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //kill Broker1后分区信息 Topic: my-replicated-topic PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2 Topic: my-replicated-topic Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0 HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-server-start.sh -daemon config/server2.properties //重启broker1 HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //查看分区信息 Topic: my-replicated-topic PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1 Topic: my-replicated-topic Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
在Kafka中可以提供分区自动平衡的功能,与此对应的broker端参数是auto.leader.rebalance.enable,此参数的默认值为true,即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则Kafka的控制器会启动一个定时任务,这个定时任务会轮询所有的broker节点,计算每个broker节点的分区不平衡率 (broker中的不平衡率=非优先副本的leader个数/分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为10%,如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。执行周期由参数leader.imbalance.check.interval.seconds控制,默认值为300秒。Kafka中kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能。
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
# brokerid表示称为控制器的broker的id编号,timestamp表示竞选为控制器时的时间戳
{"version":1,"brokerid":0,"timestamp":"1574831950372"}
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选:如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
分区leader副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作。这种策略的基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在JSR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。
##在zookeeper中查看各broker节点信息 [zk: localhost:2181(CONNECTED) 7] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9092"]
,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074840","port":9092,"version":4} [zk: localhost:2181(CONNECTED) 8] get /brokers/ids/1 ##broker1 被kill后 org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids/1 [zk: localhost:2181(CONNECTED) 9] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9094"]
,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074178","port":9094,"version":4}
## 在zookeeper中查看分区信息 [zk: localhost:2181(CONNECTED) 15] get /brokers/topics/my-replicated-topic/partitions/2/state {"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]} [zk: localhost:2181(CONNECTED) 16] get /brokers/topics/my-replicated-topic/partitions/1/state {"controller_epoch":6,"leader":0,"version":1,"leader_epoch":1,"isr":[0,2,1]} [zk: localhost:2181(CONNECTED) 17] get /brokers/topics/my-replicated-topic/partitions/0/state {"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}
整个消息追加的过程可以概括如下:
在Kafka中,高水位(HighWatermark)的作用主要有2个:
在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于15的所有消息(不考虑事务)。位移值等于高水位的消息也属于未提交消息。
日志末端位移(LEO:Log End Offset)表示副本写入下一条消息的位移值。同一个副本对象,其高水位值不会大于LEO值。高水位和LEO是副本对象的两个重要属性。Kafka所有副本都有对应的高水位和LEO值,而不仅仅是Leader本。只不过Leader副本比较特殊,Kafka使用Leader副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其Leader副本的高水位。
在Leader副本所在的Broker上,还保存了其他Follower副本的LEO值。
Broker0上保存了某分区的Leader副本和所有Follower副本的LEO值,而Broker1上仅仅保存了该分区的某个Follower副本。Kafka把Broker0上保存的这些Follower副本又称为远程副本(RemoteReplica)。Kafka副本机制在运行过程中,会更新Broker1上Follower副本的高水位和LEO值,同时也会更新Broker0上Leader副本的高水位和LEO以及所有远程副本的LEO,但它不会更新远程副本的高水位值,也就是图中标记为灰色的部分。
Broker0上保存这些远程副本主要作用是: 帮助Leader副本确定其高水位,也就是分区高水位。
与Leader副本保持同步的判断的条件有两个:
取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。
当producer生产消息至broker后,ISR以及HW和LEO的流转过程:
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
结合HW和LEO看下acks=1的情况:
Kafka一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,kafka规定了一个分区内的.log文件最大为1G。
# 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到 index文件, # 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息 00000000000000000000.index # 消息存储文件,主要存offset和消息体 # LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log
00000000000000000000.log # 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对 应的offset到timeindex文件, # 如果需要按照时间来定位消息的offset,会先在这个文件里查找 00000000000000000000.timeindex 000000000000009900000.index 000000000000009900000.log 000000000000009900000.timeindex
KafkaBroker有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是1GB。一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做logrolling,正在被写入的那个日志段文件,叫做activelogsegment。
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量offset到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳timestamp来查找对应的偏移量信息。Kafka中的索引文件以稀疏索引(sparseindex)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引页。每当写入一定量(由broker端参数log.index.interval.bytes指定,默认值为4096,即4KB的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。
稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量.
如果broker端参数log.message.timestamp.type设置为LogAppendTime,那么消息的时间戳必定能够保持单调递增;相反,如果是CreateTime类型则无法保证。
Kafka使用零拷贝技术来提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对Linux操作系统而言,零拷贝技术依赖于底层的sendfile()方法实现。对应于Java语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。
原文:https://www.cnblogs.com/jing99/p/13861149.html