在学习笔记(一)中,讲解了kafka的安装、部署、以及bash下进行的一些简单操作,而这次将学习kafka的java客户端代码。
?
1、jar包。
在maven上,我们有两种apache kafka提供的jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> </dependency>
这里我们选择的是kafka-clients,因为kafka-clients比kafka_2.11依赖的jar少,而且对于Consumer,没有了低级别api与高级别api的区分,方便了代码的编写。
?
2、Producer。
接下来我们编写Produer的java代码:
public class Producer extends Thread { private final KafkaProducer producer; public Producer(){ Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "localhost:9092"); this.producer = new KafkaProducer(props); } @Override public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; System.out.println("Send:" + messageStr); producer.send(new ProducerRecord("my_test", messageStr)); messageNo++; try { sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
在构造函数中,我们创建一个KafkaProducer的实例,props中为必要的参数(已经最少了,不能更少)。
为了一会方便运行,我们继承了Thread类,并且重写了run。在send中还能添加一个callback回调方法,可以在你的IDE中看到这个参数,如果你的业务有需要的话,可以进行定制。
?
对于producer参数配置,将在后续的笔记中介绍。
?
3、Consumer。
然后是Consumer:
public class Consumer extends Thread { private final KafkaConsumer<String, String> consumer; public Consumer (){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-consumer-group"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer(props); } @Override public void run() { this.consumer.subscribe(Arrays.asList("my_test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.println("receive:" + record.value()); } } } }
与Producer类似,我们继承了Thread,并且在构造函数中创建了KafkaConsumer的实例。
consumer可以通过subscribe订阅想要的topic,而poll方法能够拉取消息,参数为超时时间,单位为millisenonds,如果指定时间未拉取到消息,返回ConsumerRecords.empty()。
?
4、进行测试:
public class KafkaConsumerProducerDemo { public static void main(String[] args) { new Producer().start(); new Consumer().start(); } }
?
在控制台输出如下:
Send:Message_506
receive:Message_506
Send:Message_507
receive:Message_507
Send:Message_508
receive:Message_508
Send:Message_509
receive:Message_509
Send:Message_510
receive:Message_510
?
至此,java客户端kafak代码就编写完成了。需要注意的是,对于consumer,要保证消息的处理速度能够跟上producer的生产速度,可以根据业务复杂程度与可控制程度,选择合适的线程方式处理消息(例如线程池,或者是actor)。
?
原文:http://zk-chs.iteye.com/blog/2288251