代码:
from functools import wraps import mysql.connector from sshtunnel import SSHTunnelForwarder def singleton(cls): instances = {} @wraps(cls) def get_instance(*args, **kw): if cls not in instances: instances[cls] = cls(*args, **kw) return instances[cls] return get_instance # 数据库连接实例 @singleton class MySQLSingle(object): def __init__(self, conn=‘‘, server=‘‘): self.conn = conn self.server = server def get_conn(self, host_jump, port_jump, ssh_pk_jump, user_name_jump, host_mysql, port_mysql, user_name_mysql, password_mysql, database): try: self.server = SSHTunnelForwarder( (host_jump, int(port_jump)), # 跳板机的配置 ssh_pkey=ssh_pk_jump, ssh_username=user_name_jump, remote_bind_address=(host_mysql, int(port_mysql))) # 数据库服务器的配置 self.server.start() self.conn = mysql.connector.connect(host=‘127.0.0.1‘, port=self.server.local_bind_port, user=user_name_mysql, password=password_mysql, database=database) except Exception as e: print(‘File to connect database: %s‘ % e) return self.conn, self.server
使用方法如下:
mysql_single = MySQLSingle() conn, server = mysql_single.get_conn(host_jump, port_jump, ssh_pk_jump, user_name_jump, host_mysql, port_mysql, user_name_mysql, password_mysql, database) # do something... conn.close() server.close()
原文:https://www.cnblogs.com/gide/p/9654945.html