package bonree.producer; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /******************************************************************************* * BidPlanStructForm.java Created on 2014-7-8 * Author: <a href=mailto:wanghouda@126.com>houda</a> * @Title: SimpleProducer.java * @Package bonree.producer * Description: * Version: 1.0 ******************************************************************************/ public class SimpleProducer { private static Producer<Integer,String> producer; private final Properties props=new Properties(); public SimpleProducer(){ //定义连接的broker list props.put("metadata.broker.list", "192.168.4.31:9092"); //定义序列化类(Java对象传输前要序列化) props.put("serializer.class", "kafka.serializer.StringEncoder"); producer = new Producer<Integer, String>(new ProducerConfig(props)); } public static void main(String[] args) { SimpleProducer sp=new SimpleProducer(); //定义topic String topic="mytopic"; //定义要发送给topic的消息 String messageStr = "send a message to broker "; //构建消息对象 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); //推送消息到broker producer.send(data); producer.close(); } }
package bonree.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /******************************************************************************* * Created on 2014-7-8 Author: <a * href=mailto:wanghouda@126.com>houda</a> * @Title: SimpleHLConsumer.java * @Package bonree.consumer Description: Version: 1.0 ******************************************************************************/ public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); //定义连接zookeeper信息 props.put("zookeeper.connect", zookeeper); //定义Consumer所有的groupID,关于groupID,后面会继续介绍 props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); //定义订阅topic数量 topicCount.put(topic, new Integer(1)); //返回的是所有topic的Map Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //取出我们要需要的topic中的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } public static void main(String[] args) { String topic = "mytopic"; SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic); simpleHLConsumer.testConsumer(); } }
kafka入门三:写第一个Kafka应用,布布扣,bubuko.com
原文:http://blog.csdn.net/suifeng3051/article/details/37602025