for (int i = 0; i < list.size(); i++) {...}
int length = list.size();
for (int i = 0, i < length; i++) {...}
这样,在list.size()很大的时候,就减少了很多的消耗。
String str = "aaa"; if (i == 1){ list.add(str); } //建议替换成 if (i == 1){ String str = "aaa"; list.add(str); }
需求:
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息或失败发送消息数;
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String,String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { //取出数据value String value = producerRecord.value(); //创建新的producer对象,并返回 return new ProducerRecord<String,String>(producerRecord.topic(),producerRecord.partition(), producerRecord.key(),System.currentTimeMillis()+","+value); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CountInterceptor implements ProducerInterceptor<String,String> { int success; int error; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (recordMetadata!=null){ success++; }else{ error++; } } @Override public void close() { System.out.println("success:"+success); System.out.println("error:"+error); } @Override public void configure(Map<String, ?> map) { } }
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList; import java.util.Properties; public class InterceptorProducer { public static void main(String[] args){ //创建kafka生产者的配置信息 Properties properties = new Properties(); //kafka集群 ProducerConfig properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092"); //ack应答级别 properties.put("acks","all"); //重试次数 properties.put("retries",3); //批次大小 16K properties.put("batch.size",16384); //等待时间 properties.put("linger.ms",1); //RecordAccumulator缓冲区大小 32M properties.put("buffer.memory",33554432); //key,value序列化类 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //添加拦截器 ArrayList<Object> interceptors = new ArrayList<>(); interceptors.add("com.wn.interceptor.TimeInterceptor"); interceptors.add("com.wn.interceptor.CountInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //创建生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<>(properties); //发送数据 for (int i=0;i<5;i++){ producer.send(new ProducerRecord<String, String>("aaa","aaabbb--"+i)); } //关闭资源 producer.close(); } }
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic aaa
原文:https://www.cnblogs.com/tinghao/p/12464276.html