首页 > 其他 > 详细

图解Kafka之 Partitioner

时间:2020-08-25 22:10:12      阅读:89      评论:0      收藏:0      [点我收藏+]

基于 Kafka Version 2.4

//Partitioner 接口
public interface Partitioner extends Configurable, Closeable {
    //根据给定的数据,找到 partition
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    // 关闭 partition
    public void close();
    //在批量操作前,可以修改 partition , 默认没有实现
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

只有一个默认的实现类,实现如下:

/**
默认的 partition 分配策略
1. record 有指定的,使用指定的
2. key 有值,Hash(key) & numPartitions , 得到 partition
3. 没有可用的,从所有中随机取一个
4. 有可用的,从可用中随机取一个
 */
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {}
    
    /**
     * If a batch completed for the current sticky partition, change the sticky partition. 
     * Alternately, if no sticky partition has been determined, set one.
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

org.apache.kafka.clients.producer.KafkaProducer#partition 这个方法是在执行时分配 Partition 的入口

   /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

图解Kafka之 Partitioner

原文:https://www.cnblogs.com/ElEGenT/p/13562204.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!