python版本的zabbix_sender:
使用方法:
1、导入 : from zbx_send import info
2、实例化: test=info()
3、支持方法:
添加信息: add_data("主机名",‘Key_‘,"报警内容"),可以添加多次
例: test.add_data("cluster",‘cluster.core.waring‘,"alert content")
test.add_data("cluster",‘cluster.core.waring‘,"alert content")
查看已有信息:
例: print(test.echo_data()) 返回字典
{‘host‘: ‘cluster‘, ‘value‘: ‘alert content‘, ‘key‘: ‘cluster.core.waring‘, ‘clock‘: 1533880388}
{‘host‘: ‘cluster‘, ‘value‘: ‘alert content‘, ‘key‘: ‘cluster.core.waring‘, ‘clock‘: 1533880388}
print test 返回列表
[{‘host‘: ‘cluster‘, ‘value‘: ‘alert content‘, ‘key‘: ‘cluster.core.waring‘, ‘clock‘: 1533880444}, {‘host‘: ‘cluster‘, ‘value‘: ‘alert content‘, ‘key‘: ‘cluster.core.waring‘, ‘clock‘: 1533880444}]
删除内容: print test.delete_element(test[-1]) 返回Boolen
清空内容: print test.clear_data() 返回Boolen
修改内容: test[0] == test[1]
4、发送数据:data=test.send_format()
print test.send_data(data)
成功返回值: {"response":"success","info":"processed: 2; failed: 0; total: 2; seconds spent: 0.000036"}
失败返回值: {"response":"success","info":"processed: 0; failed: 2; total: 2; seconds spent: 0.000036"}
class info(object):
"""
:function: connect zabbix_server and then sender value
"""
def __init__(self):
self.server = get_server_ip()
if self.server in PROXY_IP:
self.server = ‘106.3.144.10‘
self.port = 31351
self.header = ‘‘‘ZBXD\1{0}{1}‘‘‘
self.data = []
def __len__(self):
return len(self.data)
def __repr__(self):
return "{0}".format(self.data)
def __getitem__(self,index):
return self.data[index]
def __clock(self):
"""
:rtype:int
:functrion: return current timestamp
"""
return int(time.time())
def create_obj(self,host,key,value,clock):
"""
:type host : str
:type key : str
:type value:str or int
:type clock: int
:rtype dict
:function : create item value and return to self.add_data
"""
obj = {
"host":host,
"key": key,
"value": value
}
if clock:
obj["clock"] = clock
else:
obj["clock"] = self.__clock()
return obj
def add_data(self,host, key, value, clock=None):
"""
:rtype:list
:function : add send value to self.data
"""
obj=self.create_obj(host,key,value,clock)
self.data.append(obj)
def echo_data(self):
"""
:rtype list
:function : print self.data
"""
for elem in self.data:
print str(elem)
@property
def get_data(self):
"""
:funtion : return self.data copy
"""
return self.data[:]
def delete_element(self,element):
"""
:rtype:bool
:function : delete element from self.data
"""
if element in self.data:
self.data.remove(element)
return True
def clear_data(self):
"""
:function : clear self.data
"""
self.data=[]
return True
def send_format(self):
"""
:rtype:json
:function : return json data
"""
sender_data = {
"request": "sender data",
"data": self.data,
}
return json.dumps(sender_data)
def send_data(self,data):
"""
:rtype:dict
:funtion: send value to zabbix_server
"""
data_length = len(data)
data_header = struct.pack(‘i‘,data_length) + ‘\0\0\0\0‘
data_to_send = self.header.format(data_header, data)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.server, self.port))
sock.send(data_to_send)
response_header = sock.recv(5)
if not response_header == ‘ZBXD\1‘:
raise ValueError(‘无效的响应数据‘)
response_data_header = sock.recv(8)
response_data_header = response_data_header[:4]
response_len = struct.unpack(‘i‘, response_data_header)[0]
response_raw = sock.recv(response_len)
sock.close()
response = response_raw
return response
原文:https://www.cnblogs.com/flashBoxer/p/9475949.html