首页 > 编程语言 > 详细

python编写producer、consumer

时间:2019-03-03 16:33:03      阅读:298      评论:0      收藏:0      [点我收藏+]

自主producer、consumer

  • 首先在不同的终端,分别开启两个consumer,保证groupid一致

    ]# python consumer_kafka.py
  • 执行一次producer

    ]# python producer_kafka.py
  • 指定key的partition进行发送信息:

    from kafka import KafkaProducer
    ?
    producer = KafkaProducer(bootstrap_servers=localhost:9092)
    ?
    # # block until all pending messages are sent
    # for _ in range(10):
    #     producer.send(‘test_m_brokers‘, b‘are you ok!!!‘)
    # 
    # producer.flush()
    ?
    ?
    # key for hashed partitioning
    producer.send(zhongqiu_many_brokers, key=b8, value=baaa)
    producer.flush()
  • 指定partition和offset读数据

#encoding=utf8
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition

def main():
    consumer = KafkaConsumer(zhongqiu_many_brokers, bootstrap_servers=[master:9092])
    print consumer.partitions_for_topic("zhongqiu_many_brokers")
    print consumer.topics()  #获取主题列表
    print consumer.subscription()  #获取当前消费者订阅的主题
    print consumer.assignment()  #获取当前消费者topic、分区信息
    print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量

    consumer.seek(TopicPartition(topic=uzhongqiu_many_brokers, partition=0), 10)  #重置偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
            message.offset, message.key,
            message.value))

if __name__ == "__main__":
    main()

 

python编写producer、consumer

原文:https://www.cnblogs.com/zxbdboke/p/10466121.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!