首页 > 编程语言 > 详细

用kafka作为消息中间件 -实现数据生产和消费的python脚本

时间:2019-08-20 15:31:54      阅读:259      评论:0      收藏:0      [点我收藏+]
kafka实现推送生产数据的tmp_sc.py脚本文件
import socket

import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
#kafka的配置文件:(bootstrap_servers:kafka的集群地址,topic_name:主题,consumer_id:消费分组)
KAFKA_SETTING = {
‘bootstrap_servers‘: ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"],
‘topic_name‘: ‘user_data‘,
‘consumer_id‘: ‘consumer_ai‘
}
conf= KAFKA_SETTING

print("[setting] =", conf)


producer = KafkaProducer(bootstrap_servers=conf[‘bootstrap_servers‘],
api_version = (0,10),
retries=5)

partitions = producer.partitions_for(conf[‘topic_name‘])
print(‘Topic下分区: %s‘ % partitions)
#需要推送的数据:(推送到kafka的数据类型必须的json类型)
user_data = {
"appToken": "d23ea83dbf7c411aa36e5ab519f41818",
"appId": "JF_WK_001",
"mobile": "15950857927",
"isRealTimeReturn": True,
"applyTime": 15100226057,
"uuid": "a91140f54b898w85d7a50d4b95994",
"customerNo": 1153265851
}

send_data = bytes(json.dumps(user_data), encoding="utf-8")

try:
future = producer.send(conf[‘topic_name‘], send_data)
future.get()
print(‘send message succeed.‘)
except KafkaError as e:
print(‘send message failed. [e] =‘),

kafka实现推送生产数据的tmp_xf.py脚本文件
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFKA_SETTING = {
‘bootstrap_servers‘: ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"],
‘topic_name‘: ‘result_data‘,
‘topic_name_user‘: ‘user_data‘,
‘consumer_id‘: ‘consumer_ai‘
}

conf = KAFKA_SETTING

consumer = KafkaConsumer(bootstrap_servers=conf[‘bootstrap_servers‘],
group_id=conf[‘consumer_id‘],
api_version = (0,10))

print(‘consumer start to consuming...‘)
consumer.subscribe((conf[‘topic_name‘], ))
from IPython import embed
# embed()

print("consumer = ", consumer)
for message in consumer:
print(message.topic, message.offset, message.key, message.value, message.partition)
 

用kafka作为消息中间件 -实现数据生产和消费的python脚本

原文:https://www.cnblogs.com/DJRemix/p/11383103.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!