首页 > 编程语言 > 详细

python-kafka手动提交消息测试

时间:2021-08-11 23:43:37      阅读:30      评论:0      收藏:0      [点我收藏+]

发送方代码

import json
import time
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors

if __name__ == __main__:
    producer = KafkaProducer(
        bootstrap_servers=[192.168.32.10:9092],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送三条消息
    for i in range(0, 3):
        time.sleep(2)
        data = {
            "filePath": "filePath",
            "dataName": "2222",
            "index": i
        }
        future = producer.send(

            kafka_test,  # 同一个key值,会被送至同一个分区
            key=11,
            value=data)  # 向分区1发送消息
        print("send {}".format(data))
        try:
            future.get(timeout=100)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()

接收方代码

import json
import time
from threading import Thread
from kafka import KafkaConsumer


def parse(con, value):
    try:
        i = value["index"]
        print(i, time.ctime(), "start")
        time.sleep(10 - i * 3)
        # if i == 0:
        #     pass
        # else:
        #     con.commit()
        print(i, time.ctime(), "end")
    except ZeroDivisionError:
        print("除 0 ")
        con.commit()
    except Exception as e:
        print(e)
    finally:
        print("---")


if __name__ == __main__:
    consumer = KafkaConsumer(
        bootstrap_servers=[192.168.32.10:9092],
        group_id=test,
        auto_offset_reset=latest,
        value_deserializer=lambda m: json.loads(m.decode()),
        enable_auto_commit=False
    )
    consumer.subscribe(topics=["kafka_test"])

    while True:
        fetch_data_dict = consumer.poll(timeout_ms=100, max_records=1)
        # print("msg=====", fetch_data_dict)
        for keys, values in fetch_data_dict.items():
            for i in values:
                th = Thread(target=parse, args=(consumer, i.value))
                th.start()

测试场景一:

  设置enable_auto_commit=False后,发送方发送3条消息,接收方不手动commit(), 则下次重启消费方会重复消费上次未提交的数据

测试场景二:

       设想:三条消息,每条消息消费的时长不一致,假设第一条10s, 第二条7s,第三条4s, kafka的消费是按照顺序的,我们启动三个线程对应消费三条消息。可想而知第三条消费最快,提交后会阻塞吧?因为此时第二条,第一条还没提交。

  结论:不会阻塞,查了文档不知道为啥。。大家知道可以留言讨论

测试场景三:

  还是启动三个线程对应消费三条消息,parse方法如下,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费几次?

def parse(con, value):
    i = value["index"]
    print(i, time.ctime(), "start")
    # time.sleep(10 - i * 3)
    print(10/i)
    # if i == 0:
    #     pass
    # else:
    #     con.commit()
    con.commit()
    print(i, time.ctime(), "end")

  结论: 不会消费了,第一条被提交了

测试场景四:

  启动三个线程对应消费三条消息,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?

  结论: 不会消费了,第一条被提交了

测试场景五:

  不启动线程,同步消费,,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费吗?

  结论:第一,二,三会再次消费

测试场景六:

  不启动线程,同步消费,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?

  结论:第一,二,三会再次消费

 

可见启动线程异步消费的话,部分失败的消息会被成功的消息commit掉,同步消费则不会!

 

python-kafka手动提交消息测试

原文:https://www.cnblogs.com/saintlu/p/15130417.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!