SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL
-
Python
mysql
+
mysqldb:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
pymysql
mysql
+
pymysql:
/
/
<username>:<password>@<host>
/
<dbname>[?<options>]
MySQL
-
Connector
mysql
+
mysqlconnector:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
cx_Oracle
oracle
+
cx_oracle:
/
/
user:
pass
@host:port
/
dbname[?key
=
value&key
=
value...]
更多详见:http:
/
/
docs.sqlalchemy.org
/
en
/
latest
/
dialects
/
index.html
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine.execute( "INSERT INTO ts_test (a, b) VALUES (‘2‘, ‘v1‘)" ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)" , (( 555 , "v1" ),( 666 , "v1" ),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)" , id = 999 , name = "v1" ) result = engine.execute( ‘select * from ts_test‘ ) result.fetchall() |
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | #!/usr/bin/env python from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table( ‘user‘ , metadata, Column( ‘id‘ , Integer, primary_key = True ), Column( ‘name‘ , String( 20 )), ) color = Table( ‘color‘ , metadata, Column( ‘id‘ , Integer, primary_key = True ), Column( ‘name‘ , String( 20 )), ) metadata.create_all(engine) |
添加(在上述代码后面添加):
1 2 3 4 5 | conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{ ‘id‘ : 7 , ‘name‘ : ‘seven‘ }) conn.close() |
删除(同上):
1 2 3 4 | #sql = user.insert().values(id=123, name=‘hetan‘) #conn.execute(sql) sql = user.delete().where(user.c. id > 1 ) conn.execute(sql) |
修改(同上):
1 2 | sql = user.update().where(user.c.name = = ‘hetan‘ ).values(name = ‘ed‘ ) conn.execute(sql) |
查询(同上):
1 2 3 4 | sql = select([user,]) result = conn.execute(sql) print (result.fetchall()) |
查询语句还有如下:
1 2 3 4 | # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) |
一个完整的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | #!/usr/bin/env python from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column ,Integer ,String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine Base = declarative_base() class Host(Base): __tablename__ = ‘hosts‘ id = Column(Integer,primary_key = True ,autoincrement = True ) hostname = Column(String( 64 ),unique = True ,nullable = False ) ip_addr = Column(String( 128 ),unique = True ,nullable = False ) port = Column(Integer,default = 22 ) Base.metadata.create_all(engine) if __name__ = = ‘__main__‘ : SessionCls = sessionmaker(bind = engine) session = SessionCls() h1 = Host(hostname = ‘localhost‘ ,ip_addr = ‘127.0.0.1‘ ) h2 = Host(hostname = ‘unbuntu‘ ,ip_addr = ‘192.168.1.1‘ ) session.add_all([h1,h2]) session.commit() |
1 2 3 4 5 | h3 = Host(hostname = ‘ubuntu2‘ ,ip_addr = ‘192.168.2.244‘ ,port = 20000 ) h3.hostname = ‘ubuntu_test‘ #只要没提交,此时修改也没问题 session.rollback() session.add(h3) session.commit() |
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
创建表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine Base = declarative_base() class User(Base): __tablename__ = ‘users‘ id = Column(Integer, primary_key = True ) name = Column(String( 50 )) # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息 Base.metadata.create_all(engine) Session = sessionmaker(bind = engine) session = Session() |
添加:
1 2 3 4 5 6 7 | u = User( id = 2 , name = ‘sb‘ ) session.add(u) session.add_all([ User( id = 3 , name = ‘sb‘ ), User( id = 4 , name = ‘sb‘ ) ]) session.commit() |
删除:
1 2 | session.query(User). filter (User. id > 2 ).delete() session.commit() |
修改:
res = session.query(User).filter(User.id == 2)
res.id = 3
print(res.id)
session.commit()
ret = session.query(User).filter_by(name=‘sb‘).first()
print(ret.id)
ret = session.query(User).filter_by(name=‘sb‘).all()
print(ret)
session.commit()
# ret = session.query(User).filter(User.name.in_([‘sb‘,‘bb‘])).all()
# print ret
# ret = session.query(User.name.label(‘name_label‘)).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column ,Integer ,String,ForeignKey,Table
from sqlalchemy.orm import sessionmaker,relationship
from sqlalchemy import create_engine
Base = declarative_base()
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/test",echo=True)
class Host(Base):
__tablename__= ‘hosts‘
id = Column(Integer,primary_key=True,autoincrement=True)
group_id = Column(Integer,ForeignKey(‘group.id‘))
hostname = Column(String(64),unique=True,nullable=False)
ip_addr = Column(String(128),unique=True,nullable=False)
port = Column(Integer,default=22)
groups = relationship(‘Group‘,
backref=‘host‘)
def __repr__(self):
return ‘<id=%s hostname=%s ip_addr=%s>‘ %(self.id,self.hostname,self.ip_addr)
class Group(Base):
__tablename__ = ‘group‘
id = Column(Integer,primary_key=True)
name = Column(String(64),unique=True,nullable=True)
def __repr__(self):
return ‘<id=%s name=%s>‘ %(self.id,self.name)
Base.metadata.create_all(engine)
SessionCls = sessionmaker(bind=engine)
session = SessionCls()
g1 = Group(name = ‘g1‘)
g2 = Group(name = ‘g2‘)
g3 = Group(name = ‘g3‘)
g4 = Group(name = ‘g4‘)
session.add_all([g1,g2,g3,g4])
h1 = Host(hostname=‘hetan‘,ip_addr=‘127.0.0.1‘)
h2 = Host(hostname=‘liuyao‘,ip_addr=‘10.0.0.1‘)
session.add_all([h1,h2])
session.commit()
g4 = session.query(Group).filter(Group.name==‘g4‘).first()
h1 = session.query(Host).filter(Host.hostname==‘hetan‘).update({‘group_id‘:g4.id})
session.commit()
#!/usr/bin/env python
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column ,Integer ,String,ForeignKey,Table
from sqlalchemy.orm import sessionmaker,relationship
from sqlalchemy import create_engine
Base = declarative_base()
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/test",echo=True)
host_to_group = Table(
‘host_2_group‘,Base.metadata,
Column(‘host_id‘,ForeignKey(‘hosts.id‘),primary_key=True),
Column(‘group_id‘,ForeignKey(‘group.id‘),primary_key=True)
)
class Host(Base):
__tablename__= ‘hosts‘
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False)
ip_addr = Column(String(128),unique=True,nullable=False)
port = Column(Integer,default=22)
groups = relationship(‘Group‘,
secondary=host_to_group,
backref=‘host‘)
def __repr__(self):
return ‘<id=%s hostname=%s ip_addr=%s>‘ %(self.id,self.hostname,self.ip_addr)
class Group(Base):
__tablename__ = ‘group‘
id = Column(Integer,primary_key=True)
name = Column(String(64),unique=True,nullable=True)
def __repr__(self):
return ‘<id=%s name=%s>‘ %(self.id,self.name)
Base.metadata.create_all(engine)
SessionCls = sessionmaker(bind=engine)
session = SessionCls()
g1 = Group(name = ‘g1‘)
g2 = Group(name = ‘g2‘)
g3 = Group(name = ‘g3‘)
g4 = Group(name = ‘g4‘)
session.add_all([g1,g2,g3,g4])
h1 = Host(hostname=‘hetan‘,ip_addr=‘127.0.0.1‘)
h2 = Host(hostname=‘liuyao‘,ip_addr=‘10.0.0.1‘)
session.add_all([h1,h2])
groups = session.query(Group).all()
hosts = session.query(Host).all()
print(hosts,groups)
h1.groups = groups[1:-1] #关联
g1.host = hosts #关联
- session.commit()
g1 = session.query(Group).first()
h1 = session.query(Host).first()
print(‘--->‘,g1.host)
print(‘--->‘,h1.groups)
session.commit()
成功
原文:http://www.cnblogs.com/hetan/p/5331127.html