首页 > 其他 > 详细

kafka:enable.auto.commit

时间:2019-12-27 09:10:06      阅读:312      评论:0      收藏:0      [点我收藏+]

一、背景

项目中有一个需求,是通过消费kafka的消息来处理数据,但是想要实现延迟消费的效果,于是想到了是否可以自己管理kafka的commit来实现,就是通过设置`enable.auto.commit`为False,预期是如果消费到了消息,但是不commit,kafka就会重新把消息放回队列,后续还会再次消费到,直到超过设置的延迟时间再真正消费并commit。

于是写了个demo来验证,结果发现这个配置的效果并不是自己想要的。

二、生产者

生产者每秒钟向kafka的topic发送一条消息。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time

from confluent_kafka import Producer, KafkaError
from confluent_kafka import TopicPartition
from confluent_kafka import OFFSET_BEGINNING

p = Producer({bootstrap.servers:localhost:9092, localhost:9093, localhost:9094})

topic = nico-test
msg_tpl = hello kafka:{0}

while True:
    msg = msg_tpl.format(time.time())
    p.produce(topic, msg)
    print(Produce msg:{0}.format(msg))
    time.sleep(1)

p.flush()

三、消费者

消费者设置了配置项enable.auto.commit:False。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time

from confluent_kafka import Consumer, KafkaError
from confluent_kafka import TopicPartition
from confluent_kafka import OFFSET_BEGINNING

c = Consumer({
    bootstrap.servers:localhost:9092, localhost:9093, localhost:9094, 
    group.id:nico-test, 
    auto.offset.reset:earliest, 
    enable.auto.commit:False
})

topic = nico-test

c.subscribe([topic])

cd = c.list_topics()
print(cd.cluster_id)
print(cd.controller_id)
print(cd.brokers)
print(cd.topics)
print(cd.orig_broker_id)
print(cd.orig_broker_name)

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue

    print(topic:{topic}, partition:{partition}, offset:{offset}, headers:{headers}, key:{key}, msg:{msg}, timestamp:{timestamp}.format(topic=msg.topic(), msg=msg.value(), headers=msg.headers(), key=msg.key(), offset=msg.offset(), partition=msg.partition(), timestamp=msg.timestamp()))

四、结果

结果是consumer启动后会一直顺序的消费消息,并且并不会把消息重放到队列中,但是当consumer被kill掉重启时,每次都是从最开始的时候消费的,所以总结一下,该配置项的作用是当配置为true时,每次获取到消息后就会自动更新存储在zookepper中的offset值。

最后自己也想了一下,这里不支持延迟消费的原因其实和kafka的实现原理有很大的关系,kafka是直接把消息存储在磁盘文件中的,如果想要实现重放(支持延迟消费)那么就需要把该消息从消息队列中删除,然后重新插入到消息队列,那这样就跟kafka的设计相违背了。

 

kafka:enable.auto.commit

原文:https://www.cnblogs.com/lit10050528/p/12105297.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!