配置flume:
http://blog.csdn.net/desilting/article/details/22811593
conf/flume-conf.properties文件:
producer.sources = s producer.channels = c producer.sinks = r producer.sources.s.channels = c producer.sources.s.type= netcat producer.sources.s.bind= 192.168.40.133 producer.sources.s.port= 44444 producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=192.168.40.133:9092 producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.custom.topic.name=mykafka producer.sinks.r.channel = c producer.channels.c.type = memory producer.channels.c.capacity = 1000
配置kafka:
http://blog.csdn.net/desilting/article/details/22872839
启动zookeeper、kafka及storm
创建topic:
bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topic mykafka
查看topic:
bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181
Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:
Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132
partition | 同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性 |
leader | 负责此partition的读写操作,每个broker都有可能成为某partition的leader |
replicas | 副本,即此partition在哪几个broker上有备份,不管broker是否存活 |
isr | 存活的replicas |
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
KafkaSink类:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.flume.Context; import org.apache.flume.Channel; import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; public class KafkaSink extends AbstractSink implements Configurable { private Context context; private Properties parameters; private Producer<String, String> producer; private static final String PARTITION_KEY_NAME = "custom.partition.key"; private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name"; private static final String DEFAULT_ENCODING = "UTF-8"; private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class); public void configure(Context context) { this.context = context; ImmutableMap<String, String> props = context.getParameters(); this.parameters = new Properties(); for (Map.Entry<String,String> entry : props.entrySet()) { this.parameters.put(entry.getKey(), entry.getValue()); } } @Override public synchronized void start() { super.start(); ProducerConfig config = new ProducerConfig(this.parameters); this.producer = new Producer<String, String>(config); } public Status process() { Status status = null; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { transaction.begin(); Event event = channel.take(); if (event != null) { String partitionKey = (String) parameters.get(PARTITION_KEY_NAME); String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME), "topic name is required"); String eventData = new String(event.getBody(), DEFAULT_ENCODING); KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic, eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData); LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]"); producer.send(data); transaction.commit(); LOGGER.info("Send message success"); status = Status.READY; } else { transaction.rollback(); status = Status.BACKOFF; } } catch (Exception e) { e.printStackTrace(); LOGGER.info("Send message failed!"); transaction.rollback(); status = Status.BACKOFF; } finally { transaction.close(); } return status; } @Override public void stop() { producer.close(); } }
KafkaSpout参考其他人的代码:
https://github.com/HolmesNL/kafka-spout需要做一些修改。
storm测试程序:
public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; transient CountMetric _countMetric; transient MultiCountMetric _wordCountMetric; transient ReducedMetric _wordLengthMeanMetric; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; initMetrics(context); } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); updateMetrics(tuple.getString(0)); } void updateMetrics(String word) { _countMetric.incr(); _wordCountMetric.scope(word).incr(); _wordLengthMeanMetric.update(word.length()); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } void initMetrics(TopologyContext context) { _countMetric = new CountMetric(); _wordCountMetric = new MultiCountMetric(); _wordLengthMeanMetric = new ReducedMetric(new MeanReducer()); context.registerMetric("execute_count", _countMetric, 5); context.registerMetric("word_count", _wordCountMetric, 60); context.registerMetric("word_length", _wordLengthMeanMetric, 60); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); String topic = args.length==2 ? args[1] : args[0]; KafkaSpout kafkaSpout = new KafkaSpout(topic,"testKafkaGroup","192.168.40.132:2181"); builder.setSpout("word", kafkaSpout, 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2); if (args != null && args.length == 2) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }
telnet 192.168.40.134 44444
在telnet端随便输入ASDF字符:
ASDF OK ASD OK F OK ASDF OK
flume端显示:
2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF 2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success 2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASD 2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success 2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:F 2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success 2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF
2014-04-08 01:30:28,446 495106 1396945828 ubuntu:6703 10:exclaim2 word_count {ADF^M!!!=1, SDF^M!!!=0, DF^M!!!=2, FDAS^M!!!=0, ^M!!!=0, AF^M!!!=1,D^M!!!=1, DS^M!!!=1, AS^M!!!=5, SAFSDF^M!!!=0, FD^M!!!=1, 224^M!!!=0, ASDF^M!!!=2, FG^M!!!=0, AD^M!!!=1, FS^M!!!=1, ASD^M!!!=5, F^M!!!=3, ASGAS^M!!!=0, SD^M!!!=1, ASFDAS^M!!!=0, FAS^M!!!=2, SDG^M!!!=0}
flume+kafka+storm,布布扣,bubuko.com
原文:http://blog.csdn.net/desilting/article/details/23194039