from sqlalchemy.ect.declaative import declarative_base
Base = declarative_base()
class User(Base): __tablename__ = "user" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(32),index=True)
from sqlalchemy import create_engine ngine = create_engine("mysql+pymysql://root:pwd@127.0.0.1:3306/user?charset=utf8") 备注:pwd表示密码,如果数据库属性中没有密码的话可以忽略,user是数据库名
# 数据库链接创建完成
# 去数据库中创建与User所对应的数据库表
# 去engine数据库中创建继承Base类所对应的数据表
Base.metadata.create_all(engine)
#############################################################################################################################
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:pwd@127.0.0.1:3306/user?charset=utf8")
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
user_obj= User(name="ywb") db_session.add(user_obj)
db_session.add_all([ User(name="Alex"), User(name="Wusir"), User(name="Yuan") ])
db_session.commit()
db_session.close()
1 # 创建会话窗口 2 from sqlalchemy.orm import sessionmaker 3 from creat_table import engine,User 4 5 Session = sessionmaker(engine) 6 db_session = Session() 7 8 9 # 简单的查询语句 10 user_list = db_session.query(User).all() 11 for row in user_list: 12 print(row.id,row.name) 13 14 # 单条查询 15 user = db_session.query(User).first() 16 print(user.id,user.name) 17 18 19 # 带条件的查询 20 user_list = db_session.query(User).filter(User.id==4).all() 21 print(user_list[0].id,user_list[0].name) 22 23 user = db_session.query(User).filter_by(id=3).first() 24 print(user.id,user.name) 25 26 user_list = db_session.query(User).filter(User.id>=2).all() 27 for row in user_list: 28 print(row.id,row.name) 29 30 31 # 查询sql语句(拓展) 32 sql = db_session.query(User).filter(User.id>=2) 33 print(sql)
1 #修改数据 2 3 4 # 建立会话 5 from sqlalchemy.orm import sessionmaker 6 # 导入链接数据库的模块 7 from creat_table import engine,User 8 # 开始创建会话 9 Session = sessionmaker(engine) 10 # 开启会话 11 db_session = Session() 12 13 # 开始修改数据 14 res = db_session.query(User).filter(User.name=="ybw").update({"name":"哈哈哈"}) 15 print(res) 16 17 db_session.commit() 18 db_session.close()
1 #删除数据 2 3 4 from sqlalchemy.orm import sessionmaker 5 from creat_table import engine,User 6 7 Session = sessionmaker(engine) 8 db_session = Session() 9 10 11 # 删除 12 13 res = db_session.query(User).filter(User.id ==4 ).delete() 14 print(res) 15 16 db_session.commit() 17 db_session.close()
1 #一对多操作 2 3 4 from sqlalchemy.ext.declarative import declarative_base 5 6 Base = declarative_base() 7 8 from sqlalchemy import Column,String,Integer,ForeignKey 9 from sqlalchemy.orm import relationship 10 11 class Student(Base): 12 __tablename__ = "student" 13 id = Column(Integer,primary_key=True) 14 name = Column(String(32)) 15 school_id = Column(Integer,ForeignKey("school.id")) 16 17 stu2sch = relationship("School",backref = "sch2stu") 18 19 class School(Base): 20 __tablename__ = "school" 21 id = Column(Integer,primary_key=True) 22 name = Column(String(32)) 23 24 25 from sqlalchemy import create_engine 26 engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day127?charset=utf8") 27 Base.metadata.create_all(engine)
1 # 添加数据 2 3 4 from sqlalchemy.orm import sessionmaker 5 from creat_table_ForeignKey import engine,Student,School 6 7 Session = sessionmaker(engine) 8 db_session = Session() 9 10 ## 添加数据 原始方法 11 sch_obj = School(name="OldBoyBeijing") 12 db_session.add(sch_obj) 13 db_session.commit() 14 sch = db_session.query(School).filter(School.name =="OldBoyBeijing").first() 15 stu_obj = Student(name="Yuan",school_id=sch.id) 16 db_session.add(stu_obj) 17 db_session.commit() 18 db_session.close() 19 20 # 添加数据 反向添加 relationship 21 22 sch_obj = School(name="OldBoyShanghai") 23 sch_obj.sch2stu = [Student(name="江疏影"),Student(name="周冬雨"),] 24 db_session.add(sch_obj) 25 db_session.commit() 26 db_session.close() 27 28 29 # 添加数据 正向添加 relationship 30 stu_obj = Student(name="大黄鸭",stu2sch = School(name="OldBoyShenzhen")) 31 db_session.add(stu_obj) 32 db_session.commit() 33 db_session.close()
1 # 查询数据 2 3 4 from sqlalchemy.orm import sessionmaker 5 from creat_table_ForeignKey import engine,Student,School 6 7 Session = sessionmaker(engine) 8 db_session = Session() 9 10 # #1.查询数据 正向查询数据 relationship 11 stu = db_session.query(Student).all() 12 for row in stu: 13 print(row.id,row.name,row.school_id,row.stu2sch.name) 14 15 16 # #1.查询数据 反向查询数据 relationship 17 18 sch = db_session.query(School).all() 19 for school in sch: 20 for student in school.sch2stu: 21 print(school.id,school.name,student.name)
1 # 更新数据 2 3 4 from sqlalchemy.orm import sessionmaker 5 from creat_table_ForeignKey import engine,Student,School 6 7 Session = sessionmaker(engine) 8 db_session = Session() 9 10 # 修改数据 11 12 sch = db_session.query(School).filter(School.name=="OldBoyShanghai").first() 13 db_session.query(Student).filter(Student.name == "大黄鸭").update({"school_id":sch.id}) 14 db_session.commit() 15 db_session.close()
1 # 删除数据 2 3 4 from sqlalchemy.orm import sessionmaker 5 from creat_table_ForeignKey import engine,Student,School 6 7 Session = sessionmaker(engine) 8 db_session = Session() 9 10 # 删除数据 11 12 sch = db_session.query(School).filter(School.name=="OldBoyShanghai").first() 13 db_session.query(Student).filter(Student.school_id == sch.id).delete() 14 db_session.commit() 15 db_session.close()
1 # 高级版查询操作,厉害了哦 2 #老规矩 3 from my_create_table import User,engine 4 from sqlalchemy.orm import sessionmaker 5 6 Session = sessionmaker(engine) 7 db_session = Session() 8 9 # 查询数据表操作 10 # and or 11 from sqlalchemy.sql import and_ , or_ 12 ret = db_session.query(User).filter(and_(User.id > 3, User.name == ‘DragonFire‘)).all() 13 ret = db_session.query(User).filter(or_(User.id < 2, User.name == ‘DragonFire‘)).all() 14 15 # 查询所有数据 16 r1 = db_session.query(User).all() 17 18 # 查询数据 指定查询数据列 加入别名 19 r2 = db_session.query(User.name.label(‘username‘), User.id).first() 20 print(r2.id,r2.username) # 15 NBDragon 21 22 # 表达式筛选条件 23 r3 = db_session.query(User).filter(User.name == "DragonFire").all() 24 25 # 原生SQL筛选条件 26 r4 = db_session.query(User).filter_by(name=‘DragonFire‘).all() 27 r5 = db_session.query(User).filter_by(name=‘DragonFire‘).first() 28 29 # 字符串匹配方式筛选条件 并使用 order_by进行排序 30 r6 = db_session.query(User).filter(text("id<:value and name=:name")).params(value=224, name=‘DragonFire‘).order_by(User.id).all() 31 32 #原生SQL查询 33 r7 = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).params(name=‘DragonFire‘).all() 34 35 # 筛选查询列 36 # query的时候我们不在使用User ORM对象,而是使用User.name来对内容进行选取 37 user_list = db_session.query(User.name).all() 38 print(user_list) 39 for row in user_list: 40 print(row.name) 41 42 # 别名映射 name as nick 43 user_list = db_session.query(User.name.label("nick")).all() 44 print(user_list) 45 for row in user_list: 46 print(row.nick) # 这里要写别名了 47 48 # 筛选条件格式 49 user_list = db_session.query(User).filter(User.name == "DragonFire").all() 50 user_list = db_session.query(User).filter(User.name == "DragonFire").first() 51 user_list = db_session.query(User).filter_by(name="DragonFire").first() 52 for row in user_list: 53 print(row.nick) 54 55 # 复杂查询 56 from sqlalchemy.sql import text 57 user_list = db_session.query(User).filter(text("id<:value and name=:name")).params(value=3,name="DragonFire") 58 59 # 查询语句 60 from sqlalchemy.sql import text 61 user_list = db_session.query(User).filter(text("select * from User id<:value and name=:name")).params(value=3,name="DragonFire") 62 63 # 排序 : 64 user_list = db_session.query(User).order_by(User.id).all() 65 user_list = db_session.query(User).order_by(User.id.desc()).all() 66 for row in user_list: 67 print(row.name,row.id) 68 69 #其他查询条件 70 """ 71 ret = session.query(User).filter_by(name=‘DragonFire‘).all() 72 ret = session.query(User).filter(User.id > 1, User.name == ‘DragonFire‘).all() 73 ret = session.query(User).filter(User.id.between(1, 3), User.name == ‘DragonFire‘).all() # between 大于1小于3的 74 ret = session.query(User).filter(User.id.in_([1,3,4])).all() # in_([1,3,4]) 只查询id等于1,3,4的 75 ret = session.query(User).filter(~User.id.in_([1,3,4])).all() # ~xxxx.in_([1,3,4]) 查询不等于1,3,4的 76 ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name=‘DragonFire‘))).all() 子查询 77 from sqlalchemy import and_, or_ 78 ret = session.query(User).filter(and_(User.id > 3, User.name == ‘DragonFire‘)).all() 79 ret = session.query(User).filter(or_(User.id < 2, User.name == ‘DragonFire‘)).all() 80 ret = session.query(User).filter( 81 or_( 82 User.id < 2, 83 and_(User.name == ‘eric‘, User.id > 3), 84 User.extra != "" 85 )).all() 86 # select * from User where id<2 or (name="eric" and id>3) or extra != "" 87 88 # 通配符 89 ret = db_session.query(User).filter(User.name.like(‘e%‘)).all() 90 ret = db_session.query(User).filter(~User.name.like(‘e%‘)).all() 91 92 # 限制 93 ret = db_session.query(User)[1:2] 94 95 # 排序 96 ret = db_session.query(User).order_by(User.name.desc()).all() 97 ret = db_session.query(User).order_by(User.name.desc(), User.id.asc()).all() 98 99 # 分组 100 from sqlalchemy.sql import func 101 102 ret = db_session.query(User).group_by(User.extra).all() 103 ret = db_session.query( 104 func.max(User.id), 105 func.sum(User.id), 106 func.min(User.id)).group_by(User.name).all() 107 108 ret = db_session.query( 109 func.max(User.id), 110 func.sum(User.id), 111 func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all() 112 """ 113 114 # 关闭连接 115 db_session.close() 116 117 orm_select_more
1 #高级版更新操作 2 from my_create_table import User,engine 3 from sqlalchemy.orm import sessionmaker 4 5 Session = sessionmaker(engine) 6 db_session = Session() 7 8 #直接修改 9 db_session.query(User).filter(User.id > 0).update({"name" : "099"}) 10 11 #在原有值基础上添加 - 1 12 db_session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) 13 14 #在原有值基础上添加 - 2 15 db_session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate") 16 db_session.commit() 17 18 orm_update_more
原文:https://www.cnblogs.com/wqzn/p/10385673.html