类 >>> 数据库的一张表 对象 >>> 表的一条记录 对象点属性 >>> 记录某一个字段对应的值
from orm_singleton.mysql_singleton import Mysql # 表的字段通常需要有的属性:字段名,字段类型,是否是主键,默认值 # 定义一个表类型 class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default # 定义一个varchar字段类型 class StringField(Field): def __init__(self, name, column_type=‘varchar(32)‘, primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) # 定义一个int字段类型 class IntegerField(Field): def __init__(self, name, column_type=‘int‘, primary_key=False, default=0): super().__init__(name, column_type, primary_key, default) # 拦截类(表)的创建过程 class MyMetaClass(type): def __new__(cls, class_name, class_bases, class_attrs): # 我们定义的元类是用来拦截模型表的创建过程,而models并不是一张模型表, # 所以不需要它的创建过程 if class_name == ‘Models‘: return type.__new__(cls, class_name, class_bases, class_attrs) # 获取表名,如果没有,把类名作为表名 table_name = class_attrs.get(‘table_name‘, class_name) # 指定这个表的主键是什么,先定义一个空的变量先放着,后面再存primary_key primary_key = None mappings = {} # 下面的for循环需要做两件事 # 拿出所有能够标识这个表字段的所有的k,v # ①想把单个单个的字段整合成一个, # ②通过获取一个一个的字段来判断到底哪一个字段是主键 for k, v in class_attrs.items(): # k:id,name v:IntegerField(),StringField() # 拿出所有自己定义的表的字段属性 if isinstance(v, Field): # 将所有的自己定义的表的字段存入字典中 mappings[k] = v if v.primary_key: # 获取表中的主键 # 健壮性校验一张表不能有多个主键,如果主键存在,报异常 if primary_key: raise TypeError(‘一张表只能有一个主键‘) primary_key = v.name # 循环mapping拿到所有的自定义字段名 for k in mappings.keys(): # 将单个单个的字段删除 class_attrs.pop(k) # 校验用户自定义的模型表是否指定了主键字段,如果没有主键,报异常 if not primary_key: raise TypeError(‘一张表必须要有主键‘) # 将标示表的特征信息 表名,表的主键字段,表的其他字段都塞到类的名称空间中 class_attrs[‘table_name‘] = table_name class_attrs[‘primary_key‘] = primary_key class_attrs[‘mappings‘] = mappings return type.__new__(cls, class_name, class_bases, class_attrs) # 正常创建就可以了,Models可以理解为辅助类,其实是为模型表添加功能的, # 用模型表继承Models, 做到无论怎么传值都可以实例化对象,又有点的方式获取或者修改 # 它不是一张表,就不用去改它的创建过程,正常创建就可以了 class Models(dict, metaclass=MyMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item, ‘没有该键‘) def __setattr__(self, key, value): self[key] = value # 把select定义成一个类方法,通过类直接调用Teacher.select() @classmethod def select(cls, **kwargs): ms = Mysql.singleton() # select * from %s if not kwargs: sql = ‘select *from %s‘ % cls.table_name res = ms.select(sql) else: k = list(kwargs.keys())[0] v = kwargs.get(k) sql = ‘select *from %s where %s=?‘ % (cls.table_name, k) # select * from user where id=? sql = sql.replace(‘?‘, ‘%s‘) # select * from user where id=%s res = ms.select(sql, v) if res: # res = [{},{},{}] # cls(name=‘...‘,password=‘...‘) return [cls(**r) for r in res] # [obj1,obj2,obj3] def update(self): ms = Mysql.singleton() # update user set name=‘jason‘, password=‘123‘ where id=1 # 定义一个列表,储存表的所有字段名 fields = [] # 当前数据对象的主键值 pr = None # 定义一个列表,储存表的所有字段名的值 values = [] for k, v in self.mappings.items(): # 拿到主键值 if v.primary_key: pr = getattr(self, v.name, v.default) else: # 除了主键之外的所有的字段名和字段值 fields.append(v.name+‘=?‘) values.append(getattr(self, v.name, v.default)) # ‘update user set name=?, password=? where id=1‘ sql = ‘update %s set %s where %s=%s‘ % (self.table_name, ‘,‘.join(fields), self.primary_key, pr) # ‘update user set name=%s, password=%s where id=1‘ sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, values) def save(self): ms = Mysql.singleton() # insert into user(name, password) values(‘jason‘, ‘123‘) # 定义一个列表,储存表的所有字段名 fields = [] # 储存与字段对应数量的?,用来计算有多少个? args = [] # 定义一个列表,储存表的所有字段名的值 values = [] for k, v in self.mappings.items(): # 将id字段去除,因为id字段是自增的,不需要人为的去操作 if not v.primary_key: # 获取字段名,?和字段值 fields.append(v.name) args.append(‘?‘) values.append(getattr(self, v.name, v.default)) # insert into user(name, password) values(?, ?) sql = ‘insert into %s(%s) values(%s)‘ % (self.table_name, ‘,‘.join(fields), ‘,‘.join(args)) # insert into user(name, password) values(%s, %s) sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, values) if __name__ == ‘__main__‘: # 创建一个表(类),该表有表名,字段tid和tname class Teacher(Models): table_name = ‘teacher‘ tid = IntegerField(name=‘tid‘, primary_key=True) tname = StringField(name=‘tname‘) # 查 res1 = Teacher.select(tname=‘李杰老师‘) res2 = res1[0] print(res2) # 改 res2.tname = ‘八门凯‘ res2.update() # 增 res3 = Teacher(tname=‘zhang‘) res3.save()
import pymysql class Mysql(object): _instance = None # 单例 def __init__(self): self.conn = pymysql.connect( host=‘127.0.0.1‘, port=3306, user=‘root‘, password=‘3822515‘, database=‘day41‘, charset=‘utf8‘, autocommit=True ) self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) def close_db(self): # 关闭链接 self.cursor.close() self.conn.close() def select(self, sql, args=None): # args设置默认值,只是一个占位符,可以传参也可以不传参 # select * from user # select * from user where id = 1 self.cursor.execute(sql, args) res = self.cursor.fetchall() # fetchall拿到的数据结构是一个列表套字典[{},{},{}] return res def execute(self, sql, args): # 保存和更新操作 # args必须有参数了,真正执行sql语句,需要传values # insert into user(name, password) values(‘jason‘, ‘123‘) # update user set name=‘jason‘, password=‘123‘ where id = 1 try: self.cursor.execute(sql, args) except BaseException as e: print(e) # 定义成一个类方法,由类来调用Mysql.singleton() @classmethod def singleton(cls): if not cls._instance: cls._instance = cls() return cls._instance
from orm_singleton.mysql_singleton import Mysql # 表的字段通常需要有的属性:字段名,字段类型,是否是主键,默认值 # 定义一个表类型 class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default # 定义一个varchar字段类型 class StringField(Field): def __init__(self, name, column_type=‘varchar(32)‘, primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) # 定义一个int字段类型 class IntegerField(Field): def __init__(self, name, column_type=‘int‘, primary_key=False, default=0): super().__init__(name, column_type, primary_key, default) # 拦截类(表)的创建过程 class MyMetaClass(type): def __new__(cls, class_name, class_bases, class_attrs): # 我们定义的元类是用来拦截模型表的创建过程,而models并不是一张模型表, # 所以不需要它的创建过程 if class_name == ‘Models‘: return type.__new__(cls, class_name, class_bases, class_attrs) # 获取表名,如果没有,把类名作为表名 table_name = class_attrs.get(‘table_name‘, class_name) # 指定这个表的主键是什么,先定义一个空的变量先放着,后面再存primary_key primary_key = None mappings = {} # 下面的for循环需要做两件事 # 拿出所有能够标识这个表字段的所有的k,v # ①想把单个单个的字段整合成一个, # ②通过获取一个一个的字段来判断到底哪一个字段是主键 for k, v in class_attrs.items(): # k:id,name v:IntegerField(),StringField() # 拿出所有自己定义的表的字段属性 if isinstance(v, Field): # 将所有的自己定义的表的字段存入字典中 mappings[k] = v if v.primary_key: # 获取表中的主键 # 健壮性校验一张表不能有多个主键,如果主键存在,报异常 if primary_key: raise TypeError(‘一张表只能有一个主键‘) primary_key = v.name # 循环mapping拿到所有的自定义字段名 for k in mappings.keys(): # 将单个单个的字段删除 class_attrs.pop(k) # 校验用户自定义的模型表是否指定了主键字段,如果没有主键,报异常 if not primary_key: raise TypeError(‘一张表必须要有主键‘) # 将标示表的特征信息 表名,表的主键字段,表的其他字段都塞到类的名称空间中 class_attrs[‘table_name‘] = table_name class_attrs[‘primary_key‘] = primary_key class_attrs[‘mappings‘] = mappings return type.__new__(cls, class_name, class_bases, class_attrs) # 正常创建就可以了,Models可以理解为辅助类,其实是为模型表添加功能的, # 用模型表继承Models, 做到无论怎么传值都可以实例化对象,又有点的方式获取或者修改 # 它不是一张表,就不用去改它的创建过程,正常创建就可以了 class Models(dict, metaclass=MyMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item, ‘没有该键‘) def __setattr__(self, key, value): self[key] = value # 把select定义成一个类方法,通过类直接调用Teacher.select() @classmethod def select(cls, **kwargs): ms = Mysql() # select * from %s if not kwargs: sql = ‘select *from %s‘ % cls.table_name res = ms.select(sql) else: k = list(kwargs.keys())[0] v = kwargs.get(k) sql = ‘select *from %s where %s=?‘ % (cls.table_name, k) # select * from user where id=? sql = sql.replace(‘?‘, ‘%s‘) # select * from user where id=%s res = ms.select(sql, v) if res: # res = [{},{},{}] # cls(name=‘...‘,password=‘...‘) return [cls(**r) for r in res] # [obj1,obj2,obj3] def update(self): ms = Mysql() # update user set name=‘jason‘, password=‘123‘ where id=1 # 定义一个列表,储存表的所有字段名 fields = [] # 当前数据对象的主键值 pr = None # 定义一个列表,储存表的所有字段名的值 values = [] for k, v in self.mappings.items(): # 拿到主键值 if v.primary_key: pr = getattr(self, v.name, v.default) else: # 除了主键之外的所有的字段名和字段值 fields.append(v.name+‘=?‘) values.append(getattr(self, v.name, v.default)) # ‘update user set name=?, password=? where id=1‘ sql = ‘update %s set %s where %s=%s‘ % (self.table_name, ‘,‘.join(fields), self.primary_key, pr) # ‘update user set name=%s, password=%s where id=1‘ sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, values) def save(self): ms = Mysql() # insert into user(name, password) values(‘jason‘, ‘123‘) # 定义一个列表,储存表的所有字段名 fields = [] # 储存与字段对应数量的?,用来计算有多少个? args = [] # 定义一个列表,储存表的所有字段名的值 values = [] for k, v in self.mappings.items(): # 将id字段去除,因为id字段是自增的,不需要人为的去操作 if not v.primary_key: # 获取字段名,?和字段值 fields.append(v.name) args.append(‘?‘) values.append(getattr(self, v.name, v.default)) # insert into user(name, password) values(?, ?) sql = ‘insert into %s(%s) values(%s)‘ % (self.table_name, ‘,‘.join(fields), ‘,‘.join(args)) # insert into user(name, password) values(%s, %s) sql = sql.replace(‘?‘, ‘%s‘) ms.execute(sql, values) if __name__ == ‘__main__‘: # 创建一个表(类),该表有表名,字段tid和tname class Teacher(Models): table_name = ‘teacher‘ tid = IntegerField(name=‘tid‘, primary_key=True) tname = StringField(name=‘tname‘) # 查 res1 = Teacher.select(tname=‘李杰老师‘) res2 = res1[0] print(res2) # 改 res2.tname = ‘八门凯‘ res2.update() # 增 res3 = Teacher(tname=‘zhang‘) res3.save()
import pymysql from orm_pool.db_pool import POOL class Mysql(object): def __init__(self): self.conn = POOL.connection() self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) def close_db(self): # 关闭链接 self.cursor.close() self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) res = self.cursor.fetchall() # fetchall拿到的数据结构是一个列表套字典[{},{},{}] return res def execute(self, sql, args): try: self.cursor.execute(sql, args) except BaseException as e: print(e)
from DBUtils.PooledDB import PooledDB import pymysql POOL = PooledDB( # 使用链接数据库的模块 creator=pymysql, # 连接池允许的最大连接数,0和None表示不限制连接数 maxconnections=6, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 mincached=2, # 链接池中最多闲置的链接,0和None不限制 maxcached=5, # 链接池中最多共享的链接数量,0和None表示全部共享。 # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少, # _maxcached永远为0,所以永远是所有链接都共享。 maxshared=3, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 blocking=True, # 一个链接最多被重复使用的次数,None表示无限制 maxusage=None, # 开始会话前执行的命令列表。 如:["set datestyle to ...", "set time zone ..."] setsession=[], # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, # 2 = when a cursor is created, 4 = when a query is executed, 7 = always ping=0, host=‘127.0.0.1‘, port=3306, user=‘root‘, password=‘3822515‘, database=‘day41‘, charset=‘utf8‘, autocommit=‘True‘ )
原文:https://www.cnblogs.com/zhangguosheng1121/p/10902618.html