1 在kafkaProducer的源码中看到
在执行dosend方法之前会先执行拦截器栈中的onSend方法。
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
代码中会把所有实现了ProducerInterceptor接口的所有类拿来,循环执行onSend方法
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
思路:我们自定义一个类,实现ProducerInterceptor接口然后把此类告知kafka (猜测:如果是spring容器,会被纳入到ioc中)
2 自定义一个类
package wyp.kafkainterceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * @author : miles wang * @date : 2019/7/19 3:33 PM */ public class MyProducerInterceptor implements ProducerInterceptor<String,String> { @Override public ProducerRecord onSend(ProducerRecord record) { return new ProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),record.value()+": wyp",record.headers()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
3 创建producer
package wyp; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.stream.IntStream; /** * 一劳永逸模式 * 发送以后就不关心server的应答 * 实现了自定义的produer拦截器,在kafkaProducer.send()方法执行执行,会先执行拦截器栈中的send方法 */ public class FireAndForgetSend { public static void main(String[] args) { Properties properties = initPro(); KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties); IntStream.range(0,100).forEach(i -> { ProducerRecord<String,String> producerRecord = new ProducerRecord<>("14",String.valueOf(i),"hello"+i); kafkaProducer.send(producerRecord); System.out.println("The message is send done and key is "+i); }); kafkaProducer.flush(); kafkaProducer.close(); } private static Properties initPro(){ final Properties properties = new Properties(); properties.put("bootstrap.servers","localhost:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"wyp.kafkainterceptor.MyProducerInterceptor"); return properties; } }
4 测试
原文:https://www.cnblogs.com/wangpipi/p/11214216.html