首页 > 其他 > 详细

confluent-kafka demo

时间:2021-08-04 11:15:43      阅读:23      评论:0      收藏:0      [点我收藏+]

安装:confluent-kafka 

pip install confluent-kafka 我直接在PyCharm里面安装

启动zk, 启动kafka server

查看已有topic

./kafka-topics.sh --zookeeper localhost:2181 --list

创建topic test

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test

控制台发送topic

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

消费者, 自动commit消息

from time import sleep

from confluent_kafka import Consumer, KafkaError

mybroker = "127.0.0.1:9092"

c = Consumer({
    bootstrap.servers: mybroker,
    group.id: mygroup,
    client.id: gxf,
    enable.auto.commit: True,
    default.topic.config: {
    auto.offset.reset: earliest
    }
})

c.subscribe([test])

while True:
    msg = c.poll(1.0)
    # print("msg:", msg)
    if msg is None:
        continue
    if msg.error():
        print("msg error")
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print(Received message: {}.format(msg.value().decode(utf-8)))
    sleep(1)

c.close()

 

confluent-kafka demo

原文:https://www.cnblogs.com/luckygxf/p/15097091.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!