1 import json 2 3 from kafka import KafkaProducer 4 5 """ 6 安装模块:pip install kafka-python 7 8 """ 9 class handle_kafka: 10 11 def __init__(self, server_ip=‘ip:port‘): 12 self.producer = KafkaProducer(bootstrap_servers=server_ip) 13 14 def send_message(self, topic, msg_dict): 15 """ 16 :param topic: 埋点打入的topic名称 17 :param msg_dict: 埋点数据 18 """ 19 # 最新版本的库必须使用 bytes 类型进行数据的发送,所以使用 encode()方法进行编码 20 msg_dict = json.dumps(msg_dict).encode() 21 self.producer.send(topic=topic, value=msg_dict) 22 23 def close(self): 24 self.producer.close() 25 26 27 if __name__ == ‘__main__‘: 28 hk = handle_kafka() 29 msg_dict = { 30 "sleep_time": 10, 31 "db_config": { 32 "database": "test_1", 33 "host": "xxxx", 34 "user": "root", 35 "password": "root" 36 }, 37 "table": "msg", 38 "msg": "Hello World" 39 } 40 hk.send_message(‘qa_wisdom_kp_diag‘, msg_dict) 41 hk.close()
原文:https://www.cnblogs.com/test-man-ldy/p/14829518.html