SQLAlchemy(4)

结果查询

上节课使用query从数据库中查询到了结果,但是query返回的对象是直接可用的吗?

首先导入模块

from connect import session
from user_modules import User

query返回对象

rs = session.query(User)
print(rs)
for i in rs:
     print(i)
#----------------------------------

#根据返回结果来看, rs  是一个 Query 对象,打印出来可以看到转化的 SQL
rs = session.query(User).filter(User.username==‘塔卡‘)
#all 是返回所有符合条件的数据
rs = session.query(User).all() #查询所有数据,返回一个列表
print(rs)
print(rs[0].username) #属性访问
#first 是返回所有符合条件数据的第一条数据
rs = session.query(User).first()  #查询第一条数据
#[0] 和 first 类似,但是如果没有符合条件的数据则会报错
session.query(User).filter(User.username==‘budong‘)[0]
#这里,在 query 中查询对象的某个属性值 ( 对应为查询表中某个字段的值 ),返回的结果不再是一个 Query 对象,而是一个列表
rs = session.query(User.username).filter(User.username == ‘塔卡‘).all()
#同理,但是 first 返回结果是一个元组
rs = session.query(User.username).filter(User.username == ‘塔卡‘).first()
#[0] 和 first 类似,但是如果没有符合条件的数据则会报错
session.query(User.username).filter(User.username==‘budong‘)[0]


#getattr(rs[0], ‘username‘),  rs[0].username这两种方式可以取到具体的数据值
rs = session.query(User).filter(User.username==‘budong‘).all()
print(hasattr(rs,‘username‘))
print(getattr(rs,‘username‘,‘litao‘))


rs = session.query(User)[0:2]  #索引取值
print(rs)
# rs2 = session.query(User).filter(User.username==‘塔卡‘).all()
# print(rs,type(rs))

条件查询一

过滤函数

filter 是一个过滤函数,过滤条件都可以书写在此函数中,不同的条件之间用 逗号 分隔

filter_by 也是一个过滤函数,但是功能要弱一些

filter 和 filter_by 的区别

二者都是 过滤函数,但是使用有如下差别:

1. filter 中需要添加 类对象,filter_by不需要

2. filter_by 中只能添加等于的条件,不能添加 不等于、大于小于等条件,filter没有这个限制

rs = session.query(User.username).filter(User.username == ‘塔卡‘)
rs2 = session.query(User.username).filter_by(username = ‘塔卡‘)
print(rs)

like 和 notlike

like 是模糊查询,和数据库中的 like 用法一样

notlike 和 like 作用相反

rs = session.query(User).filter(User.username.like(‘%塔%‘)).all()
rs = session.query(User).filter(User.username.notlike(‘%塔%‘)).all()

in_  和 notin_

in_ 和 notin_ 是范围查找,参数为列表

rs = session.query(User.username).filter(User.username.in_([‘塔卡‘,‘小泼‘])).all()
rs = session.query(User.username).filter(User.username.notin_([‘塔卡‘,‘小泼‘])).all()

is_ 和 isnot

is_  和 isnot  精确查找

rs = session.query(User).filter(User.username.is_(None)).all()
rs = session.query(User).filter(User.username.isnot(None)).all()
#判断为空还可以使用:
session.query(User.id).filter(User.username==None).all()

查询结果数:限制查询

all、limit、offset、slice、one

rs  =session.query(User.username).all()
print(rs)
rs  =session.query(User.username).limit(2).all()  #限制数量查询
rs  =session.query(User.username).offset(2).all()  #偏移量
rs  =session.query(User.username).slice(1,4).all()  #切片

#不管怎样写one只能查一条数据,如果有多条重复数据,会报错
#sqlalchemy.orm.exc.MultipleResultsFound: Multiple rows were found for one()
rs = session.query(User.username).filter(User.username == ‘李涛‘).one()
print(rs)

排序:倒叙先进行导入desc

from sqlalchemy import desc
rs = session.query(User.username,User.id).order_by(User.id).all() #升序排列
rs = session.query(User.username,User.id).order_by(desc(User.id)).all()

#综合使用
rs = session.query(User.username,User.id).order_by(desc(User.id)).filter(User.username == ‘李涛‘).all()
print(rs)

聚合函数

func.count

使用函数时,需要导入 func, group_by 和 order_by 一样,是可以直接使用的,不需要导入

having 也可以直接使用,使用方法也和 SQL 中使用类似

func.sum、func.max、func.min

extract

extract 提取对象中的数据,这里提取分钟,并把提取出来的结果用 label 命名别名,之后就可以使用 group_by 来分组

count 里面同样可以使用 *

or_

or_ 是或者的意思,和数据库中的 or 一样

# 分组查询与聚合函数一起使用
from sqlalchemy import func,extract,or_
# rs = session.query(User.password,func.count(User.id)).group_by(User.password).all()
# rs = session.query(User.password,func.count(User.id)).group_by(User.password).having(func.count(User.id) >1).all()
# print(rs)

# rs = session.query(User.password,func.sum(User.id)).group_by(User.password).all()
# rs = session.query(User.password,func.max(User.id)).group_by(User.password).all()
# rs = session.query(User.password,func.min(User.id)).group_by(User.password).all()
# print(rs)

#extract
# rs = session.query(extract(‘minute‘,User.create_time).label(‘minute‘),func.count(User.id)).group_by(‘minute‘).all()


#or_
rs = session.query(User.username).filter(or_(User.username.isnot(None),User.password == ‘1234‘)).all()
print(rs)

 3 多表查询

在user_modules.py中增加一张表UserDetails表

class UserDetails(Base):
    __tablename__=‘user_details‘
    id = Column(Integer, primary_key=True, autoincrement=True)
    id_card = Column(Integer,nullable=True,unique=True)
    lost_login = Column(DateTime)
    login_num = Column(Integer,default=0)
    user_id = Column(Integer,ForeignKey(‘user.id‘))

    def __repr__(self):
        return ‘<UserDetails(id=%s,id_card=%s,last_login=%s,login_num=%s,user_id=%s)>‘%(
            self.id,
            self.id_card,
            self.lost_login,
            self.login_num,
            self.user_id
        )

在query_test.py文件中导入新增加的表

from user_modules import User,UserDetails

笛卡尔连接:

rs =session.query(User.username,UserDetails.lost_login).filter(UserDetails.user_id==User.id).all()

去掉.all(),查看原生SQL如下:

SELECT user.id AS user_id, user.username AS user_username, user.password AS user_password, user.create_time AS user_create_time, user._locked AS user__locked, user_details.id AS user_details_id, user_details.id_card AS user_details_id_card, user_details.lost_login AS user_details_lost_login, user_details.login_num AS user_details_login_num, user_details.user_id AS user_details_user_id 
FROM user, user_details 
WHERE user_details.user_id = user.id

inner join:内连接

rs = session.query(User.username,UserDetails.lost_login).    join(UserDetails,UserDetails.user_id == User.id)

查看原生SQL如下:

SELECT user.username AS user_username, user_details.lost_login AS user_details_lost_login 
FROM user INNER JOIN user_details ON user_details.user_id = user.id

###使用笛卡尔积的filter条件与使用inner join运行的结果一致。但运行的原生sql不一致。

left join:外连接,可以将两张表对调

rs = session.query(User.username,UserDetails.lost_login).    outerjoin(UserDetails,UserDetails.user_id==User.id).all()

# print(rs)

运行SQL如下:

SELECT user.username AS user_username, user_details.lost_login AS user_details_lost_login 
FROM user LEFT OUTER JOIN user_details ON user_details.user_id = user.id

运行结果如下:

SQLAlchemy(4)

#两张表对调
rs = session.query(UserDetails.lost_login,User.username).    outerjoin(User,UserDetails.user_id==User.id).all()

# print(rs)

运行SQL如下:

SELECT user_details.lost_login AS user_details_lost_login, user.username AS user_username 
FROM user_details LEFT OUTER JOIN user ON user_details.user_id = user.id

运行结果如下:

SQLAlchemy(4)

union:

 联合查询,有自动去重的功能,对应的还有 union_all

q1 = session.query(User.id)
q2 = session.query(UserDetails.user_id)
# print(q1.union(q2).all())

子表查询

声明子表

sql_0 = session.query(UserDetails.lost_login).subquery() #声明子表#使用
# print(session.query(User,sql_0.c.lost_login))

运行结果如下:

SELECT user.id AS user_id, user.username AS user_username, user.password AS user_password, user.create_time AS user_create_time, user._locked AS user__locked, anon_1.lost_login AS anon_1_lost_login 
FROM user, (SELECT user_details.lost_login AS lost_login 
FROM user_details) AS anon_1

原生SQL查询:

sql_1 = ‘‘‘
    select * from   `user`
‘‘‘
#查询
row = session.execute(sql_1)
#循环取值# for i in row:
#     print(i)
#取值
print(row)
#fetch会记录游标运行了上面的for以后下面就查不到数据
print(row.fetchone())  #查询一条
print(row.fetchmany())  #在多查询一条
print(row.fetchall()) # 差剩下的所有

一对一表关系

Module

需要先创建对应的 Module ,这里采用之前建立好的 User 和 UserDetails

from sqlalchemy.orm import relationship
....
class User(Base):
    .....
class UserDetails(Base):
    ......

    userdetail = relationship(‘User‘,backref=‘details‘,uselist=False,cascade=‘all‘)
    #SQLAlchemy里面用来表示表关系的,一对一
    #relationship只在模型层面上生效,只在模型层面上
    def __repr__(self):           
            ........

自动添加属性

在刚才这里, User 里面本来是没有 details 这个属性的,但是在 UserDetails 里面添加 relationship 之后, User 实例会自动加上 details 属性

relationship

表关系是逻辑上的关系,但是 mysql 中并没有直接说明表关系的东西,外键约束是一个表现形式,外键是一种表之间的约束,可以用来表示这种关系

在SQLAlchemy里面,这个relationship代表了一对多的关系,当然我们可以通过参数改变关系,它默认是一对多的关系,而这个关系是SQLAlchemy里面的,和数据库没有关系,但是relationship是和外键一起使用的。

SQLAlchemy(4)SQLAlchemy(4)

SQLAlchemy(4)

反向查询

row = session.query(User).get(4)
print(row)
print(row.details)  #会把对应的详情表里面的对应的信息打印出来

结果为:

<User(id=4,username=小泼,password=55432,create_time=2019-04-01 23:10:56,_locked=False)>
        
[<UserDetails(id=2,id_card=543,last_login=2019-04-02 23:35:38,login_num=4,user_id=4)>]

正向查询:

row= session.query(UserDetails).get(2)
print(row)
print(row.userdetail)

结果为:

<UserDetails(id=2,id_card=543,last_login=2019-04-02 23:35:38,login_num=4,user_id=4)>

        <User(id=4,username=小泼,password=55432,create_time=2019-04-01 23:10:56,_locked=False)>

使用一对多

relationship 默认是 一对多 关系

uselist

uselist=True

默认是 True ,因此可以省略不写

多对多表关系

用户 和 文章之间,可以是一对多的关系,但是如果用户转载的话,就可以看成是 多对多 关系,那 多对多 关系在 SQLAlchemy 中怎么表示呢?

在Module中创建中间表

from sqlalchemy import Table

user_article = Table(‘user_article‘, Base.metadata,
                     Column(‘user_id‘, Integer, ForeignKey(‘user.id‘), primary_key=True), #联合主键
                     Column(‘article_id‘, Integer, ForeignKey(‘article.id‘), primary_key=True)
                     )

在Module中创建  文章Module

class Article(Base):
    __tablename__ = ‘article‘
    id = Column(Integer,primary_key=True,autoincrement=True)
    content = Column(String(500),nullable=True)
    create_time  = Column(DateTime,default=datetime.now)

    article_user = relationship(‘User‘,backref = ‘article‘,secondary=user_article)

    def __repr__(self):
        return ‘Article(id=%s, content=%s, creat_time=%s)‘ % (
            self.id,
            self.content,
            self.create_time
          )

调用测试:

row = session.query(User).get(4)
print(dir(row))
print(row)
print(row.article)
print(row.details)  #会把对应的详情表里面的对应的信息打印出来

结果展示:

[‘__class__‘, ‘__delattr__‘, ‘__dict__‘, ‘__dir__‘, ‘__doc__‘, ‘__eq__‘, ‘__format__‘, ‘__ge__‘, ‘__getattribute__‘, ‘__gt__‘, ‘__hash__‘, ‘__init__‘, ‘__le__‘, ‘__lt__‘, ‘__mapper__‘, ‘__module__‘, ‘__ne__‘, ‘__new__‘, ‘__reduce__‘, ‘__reduce_ex__‘, ‘__repr__‘, ‘__setattr__‘, ‘__sizeof__‘, ‘__str__‘,‘__subclasshook__‘, ‘__table__‘, ‘__tablename__‘, ‘__weakref__‘, ‘_decl_class_registry‘, ‘_locked‘, ‘_sa_class_manager‘, ‘_sa_instance_state‘, ‘article‘, ‘create_time‘, ‘details‘, ‘id‘, ‘metadata‘, ‘password‘, ‘username‘]

        <User(id=4,username=小泼,password=55432,create_time=2019-04-01 23:10:56,_locked=False)>
        
[Article(id=1, content=litao进阶了, creat_time=2019-04-05 18:51:34), Article(id=2, content=式微走了, creat_time=2019-04-01 18:52:06)]
[<UserDetails(id=2,id_card=543,last_login=2019-04-02 23:35:38,login_num=4,user_id=4)>]

包跟包管理

当把 Module 写好之后,该如何调用呢?

在模块中直接导入:

from data.user_modules import UserDetails

SQLAlchemy(4)

执行06-login.py,报错结果如下:

Traceback (most recent call last):
  File "/home/pyvip/tornado_pro/06-login.py", line 11, in <module>
    from data.user_modules import UserDetails
  File "/home/pyvip/tornado_pro/data/user_modules.py", line 2, in <module>
    from connect import Base
ImportError: No module named ‘connect‘

解决方法:

SQLAlchemy(4)

SQLAlchemy(4)

SQLAlchemy(4)

简单登录

 定义06-login.py 文件

import sys
import tornado.web
import tornado.ioloop
import tornado.httpserver
import tornado.options
from tornado.web import RequestHandler
from tornado.options import define,options
import util.ui_modules
import util.ui_methods
# import time
from data.connect import session
from data.user_modules import UserDetails, User

define(‘port‘,default=8080,help=‘run server‘,type=int)

#
class MainHandler(RequestHandler):
    def get(self):
        self.write(‘请去login‘)


class LoginHandler(RequestHandler):
    def get(self):
        self.render(‘in_out.html‘)

    def post(self):
        ‘‘‘验证逻辑‘‘‘
        user = self.get_argument(‘name‘,None)
        password = self.get_argument(‘password‘,None)
        username = session.query(User).filter(User.username == user).first()print(username)
        if username and  password == username.password:
            self.render(‘02-templates.html‘,
                        username = username.username
                        )
        else:
            self.write(‘登录失败‘)



application = tornado.web.Application(
    handlers=[
        (r‘/‘,MainHandler),
        (r‘/login‘,LoginHandler),
    ],
    debug=True,
    template_path = ‘templates‘,
    static_path=‘static‘,
    # autoescape = None, #全局取消转义
    ui_methods=util.ui_methods,
    ui_modules=util.ui_modules
)
if __name__ == ‘__main__‘:
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()

渲染模板02-templates.html为

<!DOCTYPE html>
{#去掉整个页面的转义#}
{#{% autoescape None %}#}
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Templates</title>
</head>
<body>
{% if username !=‘‘ %}
    欢迎 {{ username }} 登录
<img src="static/images/01.jpg" width="200px" alt="">
<img src="{{ static_url(‘images/02.webp‘) }}" width="200px" alt="">
{% else %}
    亲,请登录
{% end %}
</body>
</html>

登录成功返回如下内容:

SQLAlchemy(4)

登录失败返回:

SQLAlchemy(4)

将LoginHandler的post方法的代码简化为get_name方法:

class LoginHandler(RequestHandler):
    def get(self):
        self.render(‘in_out.html‘)
    def post(self):
        ‘‘‘验证逻辑‘‘‘
        user = self.get_argument(‘name‘,None)
        password = self.get_argument(‘password‘,None)
        # username = session.query(User).filter(User.username == user).first()
        username = User.get_name(user)
        print(username)
        if username and  password == username.password:
            self.render(‘02-templates.html‘,
                        username = username.username
                        )
        else:
            self.write(‘登录失败‘)

在user_modules.py文件的User Module下定义get_name方法:

class User(Base):
  。。。。
    def __repr__(self):
        return ‘‘‘
        <User(id=%s,username=%s,password=%s,create_time=%s,_locked=%s)>
        ‘‘‘%(self.id,self.username,self.password,self.create_time,self._locked)
    @classmethod
    def get_name(cls,user):
        ‘‘‘用来查询用户名的方法‘‘‘
        return session.query(cls).filter(cls.username == user).first()

相关推荐