首页 > Web开发 > 详细

flume+kafka (分区实现 默认单分区) (二)

时间:2015-12-04 20:29:56      阅读:299      评论:0      收藏:0      [点我收藏+]

这篇文章主要在上一篇文章的基础上讲一下 如何自定义flume到kafka的分区

上一节中从下面的地址下载了一个源码

https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/

 

我们只是从中获取了jar包。这次我们就利用下载的源码去自定义分区

 技术分享 

把源码通过mvn eclipse:eclipse 转变为普通java项目  导到eclipse中   结构如上图

上节讲的SimglePartition已经标出,源码如下

public class SinglePartition implements Partitioner<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SinglePartition.class);


    public SinglePartition(VerifiableProperties props) {
    }

  
    @Override
    public int partition(String key, int numberOfPartions) {
        
        return 0;
    }

}

 

我们把这个文件复制 改名为ManyPartition  修改源码中绿线标示的位置   即可

我的修改源码如下

public class ManyPartition implements Partitioner<String> {
    // - [ constant fields ] ----------------------------------------

    /**
     * The constant LOGGER.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ManyPartition.class);

  
    private int count = 0;
    public ManyPartition(VerifiableProperties props) {
    }

    @Override
    public int partition(String key, int numberOfPartions) {
        return new Random().nextInt(numberOfPartions);
    }

   
}

 

然后通过  mvn clean package  重新编译  会重新生成flumeng-kafka-plugin.jar   然后把此jar包替换flume/lib/下的同名jar包就可使用自定义分区类

 

而上一节中提到的配置文件中的producer.sinks.r.partition.key=4  实际上没有多大的作用

他对应源码中KafkaSink类的process()方法中的代码  入下图标出所示

技术分享

实际传入不传入这个partititonId没有任何区别   因为即使传入ParitionId  也是一个固定值  因此没有办法依照这个Id进行分区

 

flume+kafka (分区实现 默认单分区) (二)

原文:http://www.cnblogs.com/wjsshide/p/5020182.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!