Kafka下载地址:http://kafka.apache.org/downloads.html
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
package com.example.kafaka.web; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; /** * @author: CSH * @description: * @create: 2020-05-14 11:00 **/ public class Producer { public static final String TOPIC = "kafka_test";//定义主题 public static void main(String[] args){ Properties p = new Properties(); // p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.23.40:9092,192.168.23.77:9092");//kafka地址,多个地址用逗号分割,类似集群 p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.23.40:9092"); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p); try{ while(true){ String msg = "Hello," + new Random().nextInt(100); ProducerRecord producerRecord = new ProducerRecord(TOPIC,msg); Future future = kafkaProducer.send(producerRecord); if(future.isDone()){ System.out.println("消息发送成功:" + msg); } Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally { kafkaProducer.close(); } } }
package com.example.kafaka.web; import com.example.kafaka.constant.CommonConstant; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; /** * @author: CSH * @description: * @create: 2020-05-14 11:09 **/ public class Consumer { public static void main(String[] args) { Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.23.40:9092"); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.GROUP_ID_CONFIG, CommonConstant.TOPIC); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(p); kafkaConsumer.subscribe(Collections.singletonList(CommonConstant.TOPIC));// 订阅消息 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); records.forEach(record-> System.out.println(String.format("topic:%s,offset:%d,消息:%s", // record.topic(), record.offset(), record.value()))); } } }
原文:https://www.cnblogs.com/csh520mjy/p/12888065.html