Kafka引入事务以实现:
事务无法实现跨系统的Exactly-Once:如果数据源和目的地不只是Kafka的Topic(如各种数据库),那么操作的原子性便无法保证。
transactional.id
参数设置为非空,并将enable.idempotence
参数设置为true
;isolation.level
参数设置为read_committed
,即Consumer只消费已提交事务的消息。除此之外,还需要设置enable.auto.commit
参数为false
来让Consumer停止自动提交Offset。Kafka将事务状态存储在内部的topic:__transaction_state
中,并由Transaction Coordinator组件维护。每个事务型Producer都拥有唯一标识Transaction ID,它与ProducerID相绑定。Producer重启后会向Transaction Coordinator发送请求,根据Transaction ID找到原来的ProducerID以及事务状态。
另外,为了拒绝僵尸实例(Zombie Instance),每个Producer还会被分配一个递增的版本号Epoch。Kafka会检查提交事务的Proudcer的Epoch,若不为最新版本则拒绝该实例的请求。
在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。
两种方案:
读取消息 -> 更新(提交Offset -> 处理消息:若处理消息时发生故障,接管的Consumer从更新后的Offset读数据,则缺数据,类似于At Most Once
读取消息 -> 处理消息 -> 更新(提交)Offset:若更新Offset时发生故障,接管的Consumer重新读之前的数据,数据重复,类似于At Least Once
选取第二种方案,因为只要在消息中添加唯一主键,便可以让第二次写入的数据覆盖重复数据,从而做到Exactly-Once
原文:https://www.cnblogs.com/koktlzz/p/14629654.html