一次使用 SQLAlchemy 实现分类以及计数的业务过程
在编写业务逻辑代码的时候, 我不幸遇到下面的表结构(已经将主要逻辑抽离出来了):
class Category(Model): __tablename__ = 'category' # 分类ID id = Column(Integer, primary_key=True, autoincrement=True) # 分类名称 name = Column(String(length=255)) class Product(Model): __tablename__ = 'product' # 产品 ID id = Column(Integer, primary_key=True, autoincrement=True) # 产品名称 name = Column(String(length=255)) # 分类 ID category_id = Column(Integer)
现在需要实现的业务是返回分类的列表结果:
[ { "id": 1, "name": "分类1", "product_count": 1 }, ... ]
这是一个一对多的模型.
一般的笨拙思路就是:
data = [] categorys = Category.query.all() for category in categorys: product_count = len(Product.query.filter(Product.category_id == category.id).all()) data.append({ 'id': category.id, 'name': category.name, 'product_count': product_count })
明眼人一看就知道可以把len(Product.query.filter(Product.category_id == category.id).all())
换成:
product_count = Product.query.filter(Product.category_id == category.id).count()
但是, 根据这篇文章:[Why is SQLAlchemy count() much slower than the raw query?
](https://stackoverflow.com/que... 似乎这样写会有更好的性能:
from sqlalchemy import func session.query(func.count(Product.id)).filter(Product.category_id == category.id).scalar()
但是, 稍微有点经验的人就会对上面的写法嗤之以鼻, 因为product_count
是放在for category in categorys:
里面的, 这意味着如果categorys
有成千上万个, 就要发出成千上万个session.query()
, 而数据库请求是在网络上的消耗, 请求时间相对较长, 有的数据库没有处理好连接池, 建立连接和断开连接又是一笔巨大的开销, 所以 query 的请求应该越少越好. 像上面这样把 query 放到 for 循环中显然是不明智的选择.
于是有了下面一个请求的版本:
result = db.session.query(Product, Category) \ .filter(Product.category_id == Category.id)\ .order_by(Category.id).all() id_list = [] data = [] for product, category in result: if category and product: if category.id not in id_list: id_list.append(category.id) data.append({ 'id': category.id, 'name': category.name, 'product_count': 0 }) idx = id_list.index(category.id) data[idx]['product_count'] += 1
这样的写法十分难看, 而且同样没有合理利用 SQLAlchemy 的 count 函数. 于是改成:
product_count = func.count(Product.id).label('count') results = session.query(Category, product_count) \ .join(Product, Product.category_id == Category.id) \ .group_by(Category).all() data = [ { 'id': category.id, 'name': category.name, 'product_count': porduct_count } for category, product_count in results]
不过这里还有一个问题, 就是如果先添加一个Category
, 而属于这个Category
下没有Product
, 那么这个Category
就不会出现在data
里面, 所以join
必须改成outerjoin
. 即:
results = session.query(Category, product_count) \ .outerjoin(Product, Product.category_id == Category.id) \ .group_by(Category).all()
需求又来了!!!
现在考虑设计Product
为伪删除模式, 即添加一个is_deleted
属性判断Product
是否被删除.
那么count
函数就不能简单地count(Product.id)
, 而是要同时判断Product.is_deleted
是否为真和Product
是否为None
, 经过悉心研究, 发现使用func.nullif
可以实现这个需求,即用下面的写法:
product_count = func.count(func.nullif(Product.is_deleted.is_(False), False)).label('count') results = session.query(Category, product_count) \ .join(Product, Product.category_id == Category.id) \ .group_by(Category).all() data = [ { 'id': category.id, 'name': category.name, 'product_count': porduct_count } for category, product_count in results]
可见使用 ORM 有的时候还是需要考虑很多东西.