基于 Kafka Version 2.4
//Partitioner 接口
public interface Partitioner extends Configurable, Closeable {
//根据给定的数据,找到 partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
// 关闭 partition
public void close();
//在批量操作前,可以修改 partition , 默认没有实现
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
只有一个默认的实现类,实现如下:
/**
默认的 partition 分配策略
1. record 有指定的,使用指定的
2. key 有值,Hash(key) & numPartitions , 得到 partition
3. 没有可用的,从所有中随机取一个
4. 有可用的,从可用中随机取一个
*/
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public void close() {}
/**
* If a batch completed for the current sticky partition, change the sticky partition.
* Alternately, if no sticky partition has been determined, set one.
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
org.apache.kafka.clients.producer.KafkaProducer#partition
这个方法是在执行时分配 Partition 的入口
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的
原文:https://www.cnblogs.com/ElEGenT/p/13562204.html