ProducerRecord 含义: 发送给Kafka Broker的key/value 值对
//ProducerRecord的成员变量
public class ProducerRecord<K, V> {
private final String topic;//主题
private final Integer partition;//分区号
private final Headers headers;//消息头
private final K key;//键
private final V value;//值
private final Long timestamp;//消息时间戳
headers:可以设定一些与应用相关的信息
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将它实例进行池化来统一管理。
KafkaProducer的参数众多,bootstrap.servers,retries,key.serializer等,开发人员很难记住所有的配置,也很容易写错,所以可以用org.apache.kafka.clients.producer.ProducerConfig类预防:
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.RETRIES_CONFIG, "10");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.51:9092");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.send(record);
上述这种方式就是发后即忘,它只往kafka中发送消息并不关心消息是否正确到达,可能会造成消息丢失,发送性能最高,但是可靠性最差。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
因为直接send返回值是Future,我们知道Future.get()会阻塞线程直至线程运行结果返回,通过此方法即可达到同步目的。
try {
producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Future中可以获取一个RecordMetadata对象,包含了消息大的一些元数据信息,比如主题、分区号、分区中的偏移量(offset)、时间戳等。
此外Future.get(long timeout, TimeUnit unit)可以实现可超时的阻塞,
producer.send(record, (metadata, exception) -> {
System.out.println(metadata.topic());
});
当kafka有响应时,会触发回调方法
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给kafka。而在消费者需要用反序列化器把字节数组转成相应对象。
序列化器需要实现Serializer接口。客户端实现了多种序列化器供我们开发使用。
public interface Serializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
//用来配置当前类
void configure(Map<String, ?> configs, boolean isKey);
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
//序列化操作
byte[] serialize(String topic, T data);
/**
* Close this serializer.
*
* This method must be idempotent as it may be called multiple times.
*/
@Override
//关闭当前序列化器
void close();
}
生产者和消费者使用的序列化器是需要一一对应的。例如生产者shencghan端使用了StringSerializer,那么消费者端需要使用StringDeserializer。
消息在通过send()方法发往broker过程中,有可能经过拦截器、序列化器、分区器的一系列作用后才能真正发送到broker。拦截器一般不是必需的,而序列化器时必需的。消息经过序列化后就需要确定它发往的分区。
如果ProducerRecord 中指定了partition字段,那么就不需要分区器的作用。如果没有指定,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
kafka默认分区器是DefaultPartitioner,实现了Partitioner接口
public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name 主题名
* @param key The key to partition on (or null if no key) 键
* @param keyBytes The serialized key to partition on( or null if no key) 序列化后的键
* @param value The value to partition on or null 值
* @param valueBytes The serialized value to partition on or null 序列化后的值
* @param cluster The current cluster metadata 集群元数据信息
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
Partitioner的父接口Configurable
public interface Configurable {
/**
* Configure this class with the given key-value pairs
*/
//该方法主要用来获取配置信息以及配置初始化数据
void configure(Map<String, ?> configs);
}
在默认DefaultPartitioner中的分区计算方法:
我们也可以自定义分区器,然后配置在properties:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"....");
实现生产者拦截器只需要实现ProducerInterceptor接口
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
onSend方法调用时机:KafkaProducer在将消息序列化和计算分区之前会调用拦截器的onSend()方法来对消息进行相应的定制化操作,一般来说不要修改ProducerRecord 的topic、key、partition信息。因为可能会影响分区的计算,同样会影响broker端日志压缩。
onAcknowledgement方法调用时机:该方法会在消息被应答之前或消息发送失败时调用,优先于用户设定的Callback之前执行。
close:主要用于在关闭拦截器时执行一些资源的清理工作
自定义生产者拦截器:
public class MyProducerInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
System.out.println("onSend");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("onAcknowledgement");
}
@Override
public void close() {
System.out.println("close");
}
@Override
public void configure(Map<String, ?> configs) {
}
}
//设置properties参数
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
此外可以指定多个拦截器,形成拦截器链,拦截器链会按照配置的拦截器顺序来一一执行(各个拦截器之间用逗号隔开)
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()+","+MyProducerInterceptor2.class.getName());
整个生产者客户端由两个线程协调运行,分别为主线程和Sender线程(发送线程)
RecordAccumulator:消息累加器,主要用来缓存消息以便Sender线程可以批量发送。RecordAccumulator缓存大小可以通过生产者客户端参数buffer.memory
配置,默认32MB.如果生产者发送消息速度超过发送到服务器速度,会导致生产者空间不足,这时候,send方法会要么被阻塞要么抛出异常,取决于max.block.ms
配置,默认60000,即60s。
主线程发送过来的消息会被追加到RecordAccumulator的某个双端队列中Deque,即Deque<ProducerBatch>。ProducerBatch中包含一个至多个ProducerRecord。将较小的ProducerRecord拼凑成较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch的大小跟batch.size
有关。
Sender从RecordAccumulator中获取到缓存的消息之后,会将原本<分区,Deque<ProducerBatch>>的保存形式转换为<Node,List<ProducerBatch>>,Node表示Kafka的broker节点。
转换为<Node,List<ProducerBatch>>之后,会进一步<Node,Request>的形式,这样就可以将Request发往各个Node了。Request表示Kafka的各种协议请求。
请求在发往kafka之前还会保存到InFlightRequests中,它的具体形式为Map<NodeId,Deque<Request>>,它的作用是缓存了已经发出去但还没有收到相应的请求。配置参数:max.in.flight.requests.per.connection,默认值是5,即每个连接最多只能缓存5个未响应的请求,超过该数目就不能向这个连接发送更多请求了。
以下仅介绍部分参数,还有一些高级功能(事务、幂等)这里不做介绍。
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者者才会认为这条消息是成功写入的。
acks的值都是字符串类型的。
acks=1,默认值,生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。acks设置为1,是消息可靠性和吞吐量之间的这种方案。
acks=0,生产者发送消息之后不需要等待任何服务端的响应。
acks=-1或acks=all,生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够来自服务端的成功响应。acks=-1可以达到最强的可靠性。
这个参数用来限制生产者客户端能发送的消息的最大值,默认1MB。这个参数在对kafka整体脉络没有把控的时候,不建议修改,因为这个参数还涉及到其他参数的修改。比如broker端的message.max.bytes参数。
retries参数用来配置生产者重试的次数,默认值是0
retry.backoff.ms用来设置两次重试之间的时间间隔,避免无效的频繁重试,默认值是100ms。
这个参数用来指定在多久之后关闭闲置的连接,默认是9分钟
指定生产者发送ProducerBatch之前等待更多消息加入ProducerBatch的时间,默认为0.
用来配置Producer等待请求响应的最长时间,默认时间3000ms。请求超时后可以选择重试。
设置socket接收消息缓冲区(SO_RECBUF)的大小,默认值32768B,即32KB,如果设置为-1,则为操作系统默认值。
设置socket发送消息缓冲区(SO_RECBUF)的大小,默认值131072B,即128KB,如果设置为-1,则为操作系统默认值。
原文:https://www.cnblogs.com/wwjj4811/p/14379069.html