首页 > 其他 > 详细

Flume-Kafka-Storm 整合实例

时间:2014-01-25 10:15:27      阅读:826      评论:0      收藏:0      [点我收藏+]

启动Topology作为Kafka的comsumer

bin/storm jar storm-kafka-App-0.1.0-SNAPSHOT-jar-with-dependencies.jar  com.exec.storm.kafka.KafKaSpoutTopologyTest 192.168.137.10:2181

启动flume-kafka的producer

bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console

producer的配置

hadoop@stormspark:~/bigdata/flume-1.4.0-bin$ cat conf/producer1.properties 
#agent 
producer.sources = s
producer.channels = c
producer.sinks = r

#source 
producer.sources.s.type = netcat
producer.sources.s.bind = localhost
producer.sources.s.port = 44444
producer.sources.s.channels = c

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test

#Specify the channel for the sink 
producer.sinks.r.channel = c

producer.channels.c.type = memory
producer.channels.c.capacity = 1000

最后telnet上端口44444,随机输入信息

hadoop@stormspark:~/bigdata/storm-0.9.0.1$ telnet localhost 44444  
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is ‘^]‘.
hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh
OK
kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk
OK
k
OK
ggggggggggggggggggggggggggggggggggggggggggggg
OK
dddddddddddddddddddddddddddddddddddddddddddd
OK
fffffffffffffffffffffffffffffffffffffffffff
OK

storm Topology输出

65029 [Thread-31-words] INFO storm.kafka.PartitionManager - Added 1 messages from Kafka: stormspark:0 to internal buffers
]65030 [Thread-26-print] INFO storm.kafka.KafkaSpoutTestTopology - source: words:4, stream: default, id: {1619432409083821794=-8581701278061886467}, [kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk
165031 [Thread-31-words] INFO storm.kafka.PartitionManager - Fetched 2 messages from Kafka: stormspark:0
165031 [Thread-31-words] INFO storm.kafka.PartitionManager - Added 2 messages from Kafka: stormspark:0 to internal buffers
]65031 [Thread-26-print] INFO storm.kafka.KafkaSpoutTestTopology - source: words:4, stream: default, id: {6174046879227923319=-4824802638802727797}, [k
]65035 [Thread-26-print] INFO storm.kafka.KafkaSpoutTestTopology - source: words:4, stream: default, id: {3103434937071201291=-3498813609550670760}, [ggggggggggggggggggggggggggggggggggggggggggggg
166848 [Thread-31-words] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=stormspark:9092, partition=0}
166848 [Thread-31-words] INFO storm.kafka.PartitionManager - Writing committed offset to ZK: 239
166848 [Thread-31-words] INFO storm.kafka.ZkState - Writing /stormId/partition_0 the data {topology={id=4c0d1934-e7f3-4c01-8e61-00b7f75a0809, name=kafka}, offset=239, partition=0, broker={host=stormspark, port=9092}, topic=test}
166855 [Thread-31-words] INFO storm.kafka.PartitionManager - Wrote committed offset to ZK: 239
166855 [Thread-31-words] INFO storm.kafka.PartitionManager - Committed offset 239 for Partition{host=stormspark:9092, partition=0} for topology: 4c0d1934-e7f3-4c01-8e61-00b7f75a0809
168849 [Thread-31-words] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=stormspark:9092, partition=0}
168849 [Thread-31-words] INFO storm.kafka.PartitionManager - Committed offset 239 for Partition{host=stormspark:9092, partition=0} for topology: 4c0d1934-e7f3-4c01-8e61-00b7f75a0809
169033 [Thread-31-words] INFO storm.kafka.PartitionManager - Fetched 1 messages from Kafka: stormspark:0
169034 [Thread-31-words] INFO storm.kafka.PartitionManager - Added 1 messages from Kafka: stormspark:0 to internal buffers
]69035 [Thread-26-print] INFO storm.kafka.KafkaSpoutTestTopology - source: words:4, stream: default, id: {4606282757930762011=-6436699175118747001}, [dddddddddddddddddddddddddddddddddddddddddddd
170851 [Thread-31-words] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=stormspark:9092, partition=0}

测试成功,完毕!

Flume-Kafka-Storm 整合实例

原文:http://blog.csdn.net/luyee2010/article/details/18736051

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!