def(ctx: AssignmentContext) = { val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] val partitionAssignment = newPool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- curConsumers) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0else1)
* Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) 大专栏 kafka Poll轮询机制与消费者组的重平衡分区策略剖析="line"> info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer) assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => info("Consumer %s rebalancing the following partitions for topic %s: %s" .format(ctx.consumerId, topic, partitions)) partitions.map(partition => { TopicAndPartition(topic, partition) }) }.toSeq.sortWith((topicPartition1, topicPartition2) => { /* * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending * up on one consumer (if it has a high enough stream count). */ topicPartition1.toString.hashCode < topicPartition2.toString.hashCode })