首页 > 编程语言 > 详细

python3操作Kafka

时间:2019-11-11 21:04:20      阅读:467      评论:0      收藏:0      [点我收藏+]
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer

producer=KafkaProducer(bootstrap_servers=192.168.10.10:9092)
for i in range(111):
    future = producer.send(test, json.dumps(
        {"method": "get", "step": i, "type": "test", "testName": "kafka",
         "cid": "{0}".format(datetime.datetime.now().strftime(%Y%m%d%H%M%S)),
         "info": "demo{}".format(1)}).encode())
    record_metadata = future.get(timeout=10)
    print(record_metadata, datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S))
    time.sleep(3)
from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test‘, bootstrap_servers=[‘192.168.10.10:9092‘], auto_offset_reset=‘earliest‘)

for message in consumer:
    print(message)
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

  

python3操作Kafka

原文:https://www.cnblogs.com/xiao-xue-di/p/11838254.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!