<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
package com.itheima.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * kafka客户端之:生产者 */ public class MyKafkaProducer { public static void main(String[] args) throws Exception{ // 1.配置信息 Properties props = new Properties(); // 定义kafka服务器地址列表,不需要指定所有的broker props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092"); // 生产者需要leader确认请求完成之前接收的应答数 props.put("acks", "-1"); // 客户端失败重试次数 props.put("retries", 1); // 生产者打包消息的批量大小,以字节为单位.此处是16k props.put("batch.size", 16384); // 生产者延迟1ms发送消息 props.put("linger.ms", 1); // 生产者缓存内存的大小,以字节为单位.此处是32m props.put("buffer.memory", 33554432); // key 序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2.创建生产者 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props); // 3.生产数据 /** * 发送消息的三种方式: * 1.同步阻塞发送 * 适用场景:业务不需要高吞吐量、更关心消息发送的顺序、不允许消息发送失败 * 2.异步发送(发送并忘记) * 适用场景:业务只关心吞吐量、不关心消息发送的顺序、可以允许消息发送失败 * 3.异步发送(回调函数) * 适用场景:业务需要知道消息发送成功、不关心消息发送的顺序 */ // 1.同步阻塞发送 // 创建消息 /* System.out.println("-------------------同步发送消息......start-----------------------"); ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-sync","同步发送消息"); Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata);//itheima_topic-0@2 System.out.println("-------------------同步发送消息......end-----------------------");*/ // 2.异步发送(发送并忘记) // 创建消息 /*System.out.println("-------------------异步发送(发送并忘记)......start-----------------------"); ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async1","异步发送消息,发送并忘记"); // 发送并忘记 producer.send(record); System.out.println("-------------------异步发送(发送并忘记)......end-----------------------"); // 刷新 producer.flush();*/ // 3.异步发送(回调函数) // 创建消息 System.out.println("-------------------异步发送(回调函数)......start-----------------------"); ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima_topic",0,"key-async2","异步发送消息,(回调函数)"); // 发送,回调函数处理 producer.send(record, new Callback() { // 处理回调业务逻辑 public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("异步发送消息成功:"+recordMetadata);//itheima_topic-0@4 System.out.println("异常对象:"+e);//null } }); System.out.println("-------------------异步发送(回调函数)......end-----------------------"); // 刷新 producer.flush(); } }
package com.itheima.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; /** * kafka客户端之:消费者 */ public class MyKafkaConsumer { public static void main(String[] args) throws Exception{ // 1.配置信息 Properties props = new Properties(); // 定义kafka服务器地址列表,不需要指定所有的broker props.put("bootstrap.servers", "server1:9092,server2:9092,server3:9092"); // 消费者组id props.put("group.id", "itheima"); // 是否自动确认offset props.put("enable.auto.commit", "true"); //自动确认offset时间间隔 props.put("auto.commit.interval.ms", "1000"); // key 序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2.创建消费者 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props); // 3.消费消息 // 指定分区消费 TopicPartition partition = new TopicPartition("itheima_topic",0); // 获取已经提交的偏移量 long offset = 0L; OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); if(offsetAndMetadata !=null){ offset = offsetAndMetadata.offset(); } System.out.println("当前消费的偏移量:"+offset); // 指定偏移量消费 consumer.assign(Arrays.asList(partition)); consumer.seek(partition,offset); // 循环拉取数据 while (true){ // 拉取数据 ConsumerRecords<String, String> records = consumer.poll(1000); // 打印数据 for (ConsumerRecord<String, String> record : records) { System.out.println("消费的数据为:" + record.value()); } } } }
原文:https://www.cnblogs.com/luzhanshi/p/13376974.html