首页 > Web开发 > 详细

从flume到kafka,日志收集

时间:2016-02-29 12:46:29      阅读:283      评论:0      收藏:0      [点我收藏+]

  实时日志分析:

技术分享

 

本篇文章主要测试 从flume到kafka的日志收集,storm日志分析,学习中!

 

flume 配置文件

#collector
collector.sources=cs
collector.sinks=ck
collector.channels=cc

collector.sources.cs.type = exec
collector.sources.cs.command = tail -F /data/hudonglogs/resinlogs/hdapps1/access.log
collector.sources.cs.channels=cc

collector.channels.cc.type = memory
collector.channels.cc.capacity = 1000
collector.channels.cc.transactionCapacity = 100

#sink kafka
collector.sinks.ck.type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.ck.topic = logs
collector.sinks.ck.brokerList = localhost:9092
collector.sinks.ck.requiredAcks = 1
collector.sinks.ck.batchSize = 20
collector.sinks.ck.channel = cc

 

kafka consumer

public class KafkaConsumer extends Thread {

    ConsumerConnector connector;

    private String topic;

    public KafkaConsumer(String topic) {
        this.topic = topic;
        this.connector = Consumer.createJavaConsumerConnector(createConsumerConfig());
    }

    private ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaConfig.zkConnect);
        props.put("group.id", KafkaConfig.groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = connector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

启动 kafka集群, 然后启动producer,启动flume

技术分享

 

以上所有环境都是单节点部署!

 

从flume到kafka,日志收集

原文:http://www.cnblogs.com/re-myself/p/5226935.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!