一、修改kafka server.porperties的ip是你本机的ip
listeners=PLAINTEXT://192.168.111.130:9092
二、生产者的例子
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerDemo { private final Producer<String, String> kafkaProdcer; public final static String TOPIC = "JAVA_TOPIC"; private KafkaProducerDemo() { kafkaProdcer = createKafkaProducer(); } private Producer<String, String> createKafkaProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.111.130:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); return kafkaProducer; } void produce() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } final String key = "key" + i; String data = "hello kafka message:" + key; kafkaProdcer.send(new ProducerRecord<String, String>(TOPIC, key, data), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("发送key" + key + "成功"); } }); } } public static void main(String[] args) { KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo(); kafkaProducerDemo.produce(); } }
用properties构造一个Producer的实例,然后调用send方法,传入数据,还有一个回调函数。
可以看到数据已经进来了。
三、消费者例子
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerDemo { private final KafkaConsumer<String, String> consumer; private KafkaConsumerDemo(){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.111.130:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); } void consume(){ consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.println("I‘m coming"); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } public static void main(String[] args) { KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo(); kafkaConsumerDemo.consume(); } }
正常启动是看不到东西的, 两个同时启动才有。消费者只看接下来有哪些生产者发来新的消息。
props.put("enable.auto.commit", "true");
这个的意思是,消费后自动改变偏移量,就是会到最后。
原文:https://www.cnblogs.com/chenmz1995/p/10425802.html