版本信息
spring-boot:2.0.6.RELEASE
spring-kafka:2.1.2.RELEASE
kafka-clients:1.0.2
为什么阻塞了 60s?
首先我们知道 kafkaTemplat.send 底层是调用 KafkaProducer 的 send 方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}
根据文档的说明[1]它是一个异步的发送方法,按道理不管如何它都不应该阻塞主线程,但实际中某些情况下会出现阻塞线程,比如 broker 未正确运行,topic 未创建等情况。具体得源码分析参考 https://www.cnblogs.com/felixzh/p/11849296.html[2]
查询官方文档http://kafka.apache.org/10/documentation.html[3]得知,具体阻塞多久是由 max.block.ms 参数决定的,由于我们的业务场景是高容忍消息丢失,低容忍阻塞请求,所以需要进行优化,下面简单介绍一下 2 种优化方案。
!!!注意,以下方案只适用于高容忍消息丢失,低容忍阻塞请求业务场景
优化方案
方案 1:参数调优
max.block.ms 调整到 100ms,这个参数有以下 2 个作用
用于配置 send 数据或 partitionFor 函数得到对应的 leader 时,最大的等待时间,默认值为 60 秒
控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽 buffer.memory 这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定, 之后它将抛出一个 TimeoutException。
关闭自动重试 retries=0 默认就是 0
其他
acks=0,acks 有 4 个选项[all, -1, 0, 1] 。这里不确定会不会阻塞 send 方法,但是高容忍消息丢失,低容忍阻塞请求的业务场景配置成 0 就好了
0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个 server 失败的情况下,有点像 TCP
1:发送消息,并会等待 leader 收到确认后,一定的可靠性
-1 或 all:发送消息,等待 leader 收到确认,并进行复制操作后,才返回,最高的可靠性
其他参数参考 http://kafka.apache.org/10/documentation.html[4]
虽然调整一些参数,但是 kafka 集群不可用或请求量过大时,还是对主流程有短暂的阻塞
方案 2:真异步
kafkaTemplat.send 方法其实是个假异步方法,所以需要自己实现真异步,这里构造一个公用的线程池来处理就可以了,下面为参考代码
package com.qiaofang.tortoise.gateway.component;
import com.qiaofang.tortoise.gateway.config.KafkaAsyncProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* kafka异步操作工具类
*
* @author chenhao
* @version 1.0
* @date 2020/7/2 3:47 下午
*/
public class KafkaAsyncUtil {
private final KafkaTemplate kafkaTemplate;
private final KafkaAsyncProperties kafkaAsyncProperties;
public KafkaAsyncUtil(KafkaTemplate kafkaTemplate, KafkaAsyncProperties kafkaAsyncProperties) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaAsyncProperties = kafkaAsyncProperties;
init();
}
private ThreadPoolTaskExecutor executor;
private void init() {
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(kafkaAsyncProperties.getThreadPoolCoreThreads());
executor.setMaxPoolSize(kafkaAsyncProperties.getThreadPoolMaxThreads());
executor.setQueueCapacity(kafkaAsyncProperties.getThreadPoolQueueSize());
executor.setThreadNamePrefix("kafka-async-util-pool-");
//高容忍消息丢失场景,工作队列满了之后直接丢弃
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
}
/**
* 发送消息
*
* @param topic
* @param data
*/
public void send(String topic, Object data) {
executor.execute(() -> kafkaTemplate.send(topic, data));
}
}
/**
* kafka异步操作相关配置
* @author chenhao
* @version 1.0
* @date 2020/7/2 3:47 下午
*/
@Data
@ConfigurationProperties(prefix = "tortoise.kafka.async")
public class KafkaAsyncProperties {
/**
* core
*/
private Integer threadPoolCoreThreads = 3;
/**
* max
*/
private Integer threadPoolMaxThreads = 3;
/**
* queue大小
*/
private Integer threadPoolQueueSize = 10000;
}
有文章《关于高并发下 kafka producer send 异步发送耗时问题的分析[5]》说多线程高并发下 producer.send 的损耗比较严重,这个还要等到后续压测之后再更新文章吧
参考文章
站在巨人的肩膀上
Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重[6]
HAVENT 原创 Spring Boot + Spring-Kafka 异步配置[7]
关于高并发下 kafka producer send 异步发送耗时问题的分析[8]
http://kafka.apache.org/10/documentation.html[9]
参考资料
[1]
文档的说明: https://github.com/apache/kafka/blob/1f2d230bfdaafb34c9be12a370ab2eb4d3016039/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L853
[2]
https://www.cnblogs.com/felixzh/p/11849296.html
[3]
http://kafka.apache.org/10/documentation.html
[4]
http://kafka.apache.org/10/documentation.html
[5]
关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html
[6]
Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重: https://www.cnblogs.com/felixzh/p/11849296.html
[7]
HAVENT 原创 Spring Boot + Spring-Kafka 异步配置: https://my.oschina.net/u/943746/blog/1928471
[8]
关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html
[9]
http://kafka.apache.org/10/documentation.html
相关推荐
后台回复 学习资料 领取学习视频
如有收获,点个在看,诚挚感谢
原文:https://blog.51cto.com/14888386/2515108