发送方代码
import json import time import traceback from kafka import KafkaProducer from kafka.errors import kafka_errors if __name__ == ‘__main__‘: producer = KafkaProducer( bootstrap_servers=[‘192.168.32.10:9092‘], key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode()) # 发送三条消息 for i in range(0, 3): time.sleep(2) data = { "filePath": "filePath", "dataName": "2222", "index": i } future = producer.send( ‘kafka_test‘, # 同一个key值,会被送至同一个分区 key=11, value=data) # 向分区1发送消息 print("send {}".format(data)) try: future.get(timeout=100) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc()
接收方代码
import json import time from threading import Thread from kafka import KafkaConsumer def parse(con, value): try: i = value["index"] print(i, time.ctime(), "start") time.sleep(10 - i * 3) # if i == 0: # pass # else: # con.commit() print(i, time.ctime(), "end") except ZeroDivisionError: print("除 0 ") con.commit() except Exception as e: print(e) finally: print("---") if __name__ == ‘__main__‘: consumer = KafkaConsumer( bootstrap_servers=[‘192.168.32.10:9092‘], group_id=‘test‘, auto_offset_reset=‘latest‘, value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False ) consumer.subscribe(topics=["kafka_test"]) while True: fetch_data_dict = consumer.poll(timeout_ms=100, max_records=1) # print("msg=====", fetch_data_dict) for keys, values in fetch_data_dict.items(): for i in values: th = Thread(target=parse, args=(consumer, i.value)) th.start()
测试场景一:
设置enable_auto_commit=False后,发送方发送3条消息,接收方不手动commit(), 则下次重启消费方会重复消费上次未提交的数据
测试场景二:
设想:三条消息,每条消息消费的时长不一致,假设第一条10s, 第二条7s,第三条4s, kafka的消费是按照顺序的,我们启动三个线程对应消费三条消息。可想而知第三条消费最快,提交后会阻塞吧?因为此时第二条,第一条还没提交。
结论:不会阻塞,查了文档不知道为啥。。大家知道可以留言讨论
测试场景三:
还是启动三个线程对应消费三条消息,parse方法如下,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费几次?
def parse(con, value): i = value["index"] print(i, time.ctime(), "start") # time.sleep(10 - i * 3) print(10/i) # if i == 0: # pass # else: # con.commit() con.commit() print(i, time.ctime(), "end")
结论: 不会消费了,第一条被提交了
测试场景四:
启动三个线程对应消费三条消息,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论: 不会消费了,第一条被提交了
测试场景五:
不启动线程,同步消费,,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论:第一,二,三会再次消费
测试场景六:
不启动线程,同步消费,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论:第一,二,三会再次消费
可见启动线程异步消费的话,部分失败的消息会被成功的消息commit掉,同步消费则不会!
原文:https://www.cnblogs.com/saintlu/p/15130417.html