Kafka Safe Producer
在应用Kafka的场景中,需要考虑到在异常发生时(如网络异常),被发送的消息有可能会出现丢失、乱序、以及重复消息。
对于这些情况,我们可以创建一个“safe producer”,用于规避这些问题。下面我们会先介绍对于这几种情况的说明以及配置,最后给出一个配置示例。
1. acks 详述
之前我们介绍过 Kafka Producer 的 acks 有三种模式,下面我们进一步介绍一下这三种模式:
1.1. acks = 0(no acks)
使用acks=0 时,也就意味着:
acks=0 的工作方式如下图,不需要收到任何 ack:
一般使用 acks=0 的场景为:可以接受可能丢失数据。例如:
1.2. acks=1(leader acks)
使用acks=1 时:
acks=1 的工作方式如下图,Producer需要收到每条消息的ack:
1.3. acks=all(replicas acks)
使用acks=all 时:
acks=all的一个工作方式如下,每个replica都需要回复ack,才能保证一个write的写入:
如果是需要完全不丢失数据,则这个设置是有必要考虑的。
在设置 acks=all(也就是replica acks)时,必须与另一个参数一起用,也就是:min.insync.replicas:
假设设置参数replication.factor=3, min.insync=2, acks=all,则最多只能容许1一个broker异常,否则producer在发送数据时会收到报错。
假设有3个brokers,min.insync.replicas=2,若其中有两个broker异常,则Producer会收到“NOT_ENOUGH_REPLICAS”的异常。
2. Producer Retry
为了防止一些瞬时的错误(例如NotEnoughReplicasException)影响整个应用,一般我们需要处理一些异常,以避免数据丢失。在Producer中也有重试的配置,默认为0,可以手动调整它的值,最高可以到Integer.MAX_VALUE。
在重试时,默认情况下,会有可能造成消息发送时乱序。因为一般发送失败的消息会被re-queue,然后再次发送,所以会造成部分消息乱序。
此情况在key-based 序列消息中,尤为严重。因为所有具有相同key的messages会被送往同一个partition,而若是有消息被requeue,然后重传,则会打乱这个partition中的部分key顺序。
对于这种情况,我们可以设置参数max.in.flight.requests.per.connection控制:同一时刻,有多少个produce请求可以被并行发起:
不过在 Kafka >= 1.0.0中,对于此场景会有更好的解决方案,本文之后的部分会提及。
3. Idempotent Producer
在重传场景中,会遇到一个常见的问题是:由于网络的原因,Producer会在Kafka中引入重复的messages。
如下图所示:
一个正常的request请求为:
但是一个产生重复消息的请求过程为:
从Producer的角度看,它仅正常发送了一次消息,因为它只收到了一次 ack。从Kafka的角度看,它收到了两次消息,所以commit了两次。
在Kafka >= 0.11之后,可以定义一个“idempotent producer”,可以解决由网络问题造成的重复消息。如下图所示:
对于一个idempotent producer 来说,处理重复消息的请求过程为:
Idempotent producers可以很好的保证一个稳定,以及无重复数据的pipeline。
伴随Idempotent producers一起被设置的参数有:
对于Idempotent Producer的配置,仅需配置类似以下参数即可:
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
4. Safe Producer 配置总结
上面介绍了创建一个safe producer 所需的配置,下面我们总结一下在不同版本的Kafka中所需要做的配置:
Kafka < 0.11
Kafka >= 0.11
这里必须要提到的是:运行一个“safe producer”可能会影响系统的throughput与latency,所以在应用到生产系统前,必须先做测试以判断影响。
5. safe producer 示例
我们按照之前的步骤启动一个由Java编写的Kafka Producer,并查看输出的配置,可以看到默认的部分参数为:
acks = 1
enable.idempotence = false
max.in.flight.requests.per.connection = 5
retries = 2147483647
现在我们显式地加上以下参数:
// create a safe Producer properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
然后查看producer配置的部分输出为:
以上为创建一个safe producer所需的配置介绍以及示例,在实际生产环境中,务必要先测试safe producer 对应用吞吐以及延时的影响后,再斟酌是否有必要对参数做部分调整。
Apache Kafka(五)- Safe Kafka Producer
原文:https://www.cnblogs.com/zackstang/p/11409014.html