Для асинхронного Python существует мало полноценных ORM, и им далеко до таких монстров-комбайнов, как DjangoOrm и SQLAlchemy.ORM. Бедность ORM-инструментария для асинхронного программирования заставила многих программистов отказаться от зачастую непонятной им работы с ORM и перейти к более прозрачному взаимодействию с БД. Решение в лоб — написание raw SQL, но в этом случае запросы не будут защищены от инъекций, а запросы, составляемые по бизнес логике с опциональными параметрами, превратятся в конкатенацию строк. Важно найти баланс между прозрачностью выполнения кода, скоростью его написания и читаемостью.
Ниже я предлагаю реализацию такого баланса c использованием SQLAlchemy Core.
Прежде чем рассматривать работу SQLAlchemy Core, для расширения инженерного кругозора давайте познакомимся с альтернативными решениями и их недостатками.
Алхимия состоит из двух частей. Первая — это абстракция над SQL-базой данных, которая называется SQLAlchemy Сore. Вторая — это ORM, собственно mapping между реляционной БД и объектным представлением. Соответственно, SQLAlchemy Сore почти один к одному совпадает с SQL — если вы знаете последний, то проблем с Core, как правило, не возникает. Благодаря этому использование SQLAlchemy Сore имеет наименьший оверхэд при обращении к БД.
SQLAlchemy Core предоставляет всю основную функциональность для построения запросов. Но в крупном проекте при обращении к базовому API SQLAlchemy будет много дублирующихся участков, поэтому логично написать библиотеку-обёртку для реализации наиболее популярных сценариев использования.
Примеры основных запросов к Алхимии:
tbl = cls.__table__
select_sql = select([tbl]).where(tbl.c.user_type == user_type)
cursor = await conn.execute(select_sql)
rows = await cursor.fetchall()
tbl = cls.__table__
insert_sql = tbl.insert().values(val='abc').returning(*tbl.c)
cursor = await conn.execute(insert_sql)
row = await cursor.fetchone()
Как видите, участки кода с передачей параметров повторяются, а сам код содержит много параметров, связанных с внутренней реализацией SQLAlchemy Core, что ухудшает читаемость. Хотелось бы получать данные из БД с помощью таких запросов:
user = await User.select().where(User.user_type == user_type).get(conn)
_ = await User.insert().values(val='abc').execute(conn)
Чтобы этого добиться, необходимо реализовать обработчики для операций insert, select, update и delete.
Напишем для функции select полноценную обёртку, которая даст возможность с минимумом кода использовать все виды join, сортировки и остальные часто используемые операции:
class SelectQuery:
def __init__(self, model, fields):
self._model = model
self._from = None
_fields = []
self._relations = {field.class_ if isinstance(field, QueryableAttribute) else field for field in fields}
if model in self._relations:
self._relations.remove(model)
for field in fields:
if isinstance(field, QueryableAttribute):
_fields.append(getattr(field.class_.__table__.c, field.key))
elif isinstance(field, sa.Column):
_fields.append(field)
elif hasattr(field, '__table__'):
for f in field.__table__.c:
_fields.append(f)
else:
_fields.append(field)
self._stmt = sa.select([model, *self._relations]).with_only_columns(_fields)
def join(self, right, on):
if self._from is None:
self._from = sa.join(self._model, right, on)
else:
self._from = self._from.join(right, on)
return self
def outerjoin(self, right, on):
if self._from is None:
self._from = sa.outerjoin(self._model, right, on)
else:
self._from = self._from.outerjoin(right, on)
return self
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def group_by(self, column):
self._stmt = self._stmt.group_by(column)
return self
def order_by(self, *fields):
self._stmt = self._stmt.order_by(*fields)
return self
def distinct(self, *fields):
self._stmt = self._stmt.distinct()
return self
def limit(self, limit):
self._stmt = self._stmt.limit(limit)
return self
def offset(self, offset):
self._stmt = self._stmt.offset(offset)
return self
async def get(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)
result = await conn.execute(self._stmt)
result = await result.first()
return result
async def all(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)
result = await conn.execute(self._stmt)
return await result.fetchall()
def get_query(self):
if self._from is not None:
return self._stmt.select_from(self._from)
return self._stmt
@property
def raw_sql(self):
stmt = self._stmt
if self._from is not None:
stmt = stmt.select_from(self._from)
return stmt.compile(compile_kwargs={'literal_binds': True})
def _create_model(self, model_class, data):
model_fields = {
getattr(field, 'name'): data[str(field)] for field in model_class.__table__.c if str(field) in data
}
model = model_class(**model_fields)
for relation in model_class.__mapper__.relationships:
if relation.mapper.class_ in self._relations:
relation_model = self._create_model(relation.mapper.class_, data)
setattr(model, relation.key, relation_model)
return model
Также специфическими операциями обладают запросы на удаление:
class DeleteQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self
async def execute(self, conn):
return await conn.execute(self._stmt)
def get_query(self):
return self._stmt
Для insert и update подойдёт одинаковый набор операций:
class StatementQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt
self._values = None
def values(self, **values):
self._values = self._filter_values(values)
return self
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self
async def execute(self, conn):
self._values = self._filter_values(self._values)
if not self._values:
return
self._stmt = self._stmt.values(self._values)
result = await conn.execute(self._stmt)
return result
def get_query(self):
values = self._filter_values(self._values)
return self._stmt.values(values)
def _filter_values(self, values):
return {
f.key: values[f.key] for f in self._model.__table__.c if f.key in values
}
Обернём описанный выше код в класс, от которого можно унаследовать модели SQLAlchemy:
class DB:
@classmethod
def select(cls, fields=['*']):
return SelectQuery(cls, fields)
@classmethod
def update(cls):
return StatementQuery(cls, sa.update(cls))
@classmethod
def insert(cls):
return StatementQuery(cls, sa.insert(cls))
@classmethod
def delete(cls):
return DeleteQuery(cls, sa.delete(cls))
@staticmethod
def before_save(data):
pass
@staticmethod
def after_save(data):
pass
@classmethod
def model_fields(cls):
return list(filter(lambda x: not x.startswith('_'), dir(cls)))
Примеры использования:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base, DB):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
login = Column(String(255))
active = Column(Boolean, default=True)
@staticmethod
async def get_by_login(login, conn):
user = await User.select() \
.where(User.login == login) \
.get(conn)
return user
@staticmethod
async def get_all_active(conn):
users = await User.select() \
.where(User.active.is_(True)) \
.all(conn)
return users
@staticmethod
async def create_user(login, conn):
_ = await User.insert().values(
login=login
).execute(conn)
return True
@staticmethod
async def delete_user(login, conn):
_ = await User.update().values(
active=False
).execute(conn)
return True
Источник статьи: https://habr.com/ru/company/domclick/blog/557638/
Ниже я предлагаю реализацию такого баланса c использованием SQLAlchemy Core.
Сравнение с остальными ORM
Прежде чем рассматривать работу SQLAlchemy Core, для расширения инженерного кругозора давайте познакомимся с альтернативными решениями и их недостатками.
- Django ORM и ponyORM не подходят из-за невозможности работать в асинхронном режиме.
- SQLAlchemy.ORM: поддержку полноценной асинхронной ORM в альфа-режиме добавили в версии 1.4, пока не подходит для production-решений.
- Peewee: умеет работать в асинхронном режиме, но только с Core-функциями. Хорошая библиотека, но я предлагаю выбрать SQLAlchemy из-за её большей популярности и поддерживаемости. Также были жалобы на ленивую загрузку объектов в Peewee — иногда она намертво вешает цикл (возможно, эту ошибку уже поправили в новых версиях).
- Gino: простая ORM на основе SQLAlchemy Core. Работать с этой библиотекой можно без знаний об SQLAlchemy.
- tortoise-orm: молодой проект с похожим на Django ORM способом доступа к данным.
Работа с SQLAlchemy Core
Алхимия состоит из двух частей. Первая — это абстракция над SQL-базой данных, которая называется SQLAlchemy Сore. Вторая — это ORM, собственно mapping между реляционной БД и объектным представлением. Соответственно, SQLAlchemy Сore почти один к одному совпадает с SQL — если вы знаете последний, то проблем с Core, как правило, не возникает. Благодаря этому использование SQLAlchemy Сore имеет наименьший оверхэд при обращении к БД.
SQLAlchemy Core предоставляет всю основную функциональность для построения запросов. Но в крупном проекте при обращении к базовому API SQLAlchemy будет много дублирующихся участков, поэтому логично написать библиотеку-обёртку для реализации наиболее популярных сценариев использования.
Примеры основных запросов к Алхимии:
tbl = cls.__table__
select_sql = select([tbl]).where(tbl.c.user_type == user_type)
cursor = await conn.execute(select_sql)
rows = await cursor.fetchall()
tbl = cls.__table__
insert_sql = tbl.insert().values(val='abc').returning(*tbl.c)
cursor = await conn.execute(insert_sql)
row = await cursor.fetchone()
Как видите, участки кода с передачей параметров повторяются, а сам код содержит много параметров, связанных с внутренней реализацией SQLAlchemy Core, что ухудшает читаемость. Хотелось бы получать данные из БД с помощью таких запросов:
user = await User.select().where(User.user_type == user_type).get(conn)
_ = await User.insert().values(val='abc').execute(conn)
Чтобы этого добиться, необходимо реализовать обработчики для операций insert, select, update и delete.
Пишем «обёртку»
Напишем для функции select полноценную обёртку, которая даст возможность с минимумом кода использовать все виды join, сортировки и остальные часто используемые операции:
class SelectQuery:
def __init__(self, model, fields):
self._model = model
self._from = None
_fields = []
self._relations = {field.class_ if isinstance(field, QueryableAttribute) else field for field in fields}
if model in self._relations:
self._relations.remove(model)
for field in fields:
if isinstance(field, QueryableAttribute):
_fields.append(getattr(field.class_.__table__.c, field.key))
elif isinstance(field, sa.Column):
_fields.append(field)
elif hasattr(field, '__table__'):
for f in field.__table__.c:
_fields.append(f)
else:
_fields.append(field)
self._stmt = sa.select([model, *self._relations]).with_only_columns(_fields)
def join(self, right, on):
if self._from is None:
self._from = sa.join(self._model, right, on)
else:
self._from = self._from.join(right, on)
return self
def outerjoin(self, right, on):
if self._from is None:
self._from = sa.outerjoin(self._model, right, on)
else:
self._from = self._from.outerjoin(right, on)
return self
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def group_by(self, column):
self._stmt = self._stmt.group_by(column)
return self
def order_by(self, *fields):
self._stmt = self._stmt.order_by(*fields)
return self
def distinct(self, *fields):
self._stmt = self._stmt.distinct()
return self
def limit(self, limit):
self._stmt = self._stmt.limit(limit)
return self
def offset(self, offset):
self._stmt = self._stmt.offset(offset)
return self
async def get(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)
result = await conn.execute(self._stmt)
result = await result.first()
return result
async def all(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)
result = await conn.execute(self._stmt)
return await result.fetchall()
def get_query(self):
if self._from is not None:
return self._stmt.select_from(self._from)
return self._stmt
@property
def raw_sql(self):
stmt = self._stmt
if self._from is not None:
stmt = stmt.select_from(self._from)
return stmt.compile(compile_kwargs={'literal_binds': True})
def _create_model(self, model_class, data):
model_fields = {
getattr(field, 'name'): data[str(field)] for field in model_class.__table__.c if str(field) in data
}
model = model_class(**model_fields)
for relation in model_class.__mapper__.relationships:
if relation.mapper.class_ in self._relations:
relation_model = self._create_model(relation.mapper.class_, data)
setattr(model, relation.key, relation_model)
return model
Также специфическими операциями обладают запросы на удаление:
class DeleteQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self
async def execute(self, conn):
return await conn.execute(self._stmt)
def get_query(self):
return self._stmt
Для insert и update подойдёт одинаковый набор операций:
class StatementQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt
self._values = None
def values(self, **values):
self._values = self._filter_values(values)
return self
def where(self, condition):
self._stmt = self._stmt.where(condition)
return self
def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self
async def execute(self, conn):
self._values = self._filter_values(self._values)
if not self._values:
return
self._stmt = self._stmt.values(self._values)
result = await conn.execute(self._stmt)
return result
def get_query(self):
values = self._filter_values(self._values)
return self._stmt.values(values)
def _filter_values(self, values):
return {
f.key: values[f.key] for f in self._model.__table__.c if f.key in values
}
Обернём описанный выше код в класс, от которого можно унаследовать модели SQLAlchemy:
class DB:
@classmethod
def select(cls, fields=['*']):
return SelectQuery(cls, fields)
@classmethod
def update(cls):
return StatementQuery(cls, sa.update(cls))
@classmethod
def insert(cls):
return StatementQuery(cls, sa.insert(cls))
@classmethod
def delete(cls):
return DeleteQuery(cls, sa.delete(cls))
@staticmethod
def before_save(data):
pass
@staticmethod
def after_save(data):
pass
@classmethod
def model_fields(cls):
return list(filter(lambda x: not x.startswith('_'), dir(cls)))
Примеры использования:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base, DB):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
login = Column(String(255))
active = Column(Boolean, default=True)
@staticmethod
async def get_by_login(login, conn):
user = await User.select() \
.where(User.login == login) \
.get(conn)
return user
@staticmethod
async def get_all_active(conn):
users = await User.select() \
.where(User.active.is_(True)) \
.all(conn)
return users
@staticmethod
async def create_user(login, conn):
_ = await User.insert().values(
login=login
).execute(conn)
return True
@staticmethod
async def delete_user(login, conn):
_ = await User.update().values(
active=False
).execute(conn)
return True
Выводы
- Большинство существующих ORM на Python не позволяют работать с БД в асинхронном режиме.
- При работе с SQLAlchemy Core генерируется понятный SQL с наименьшим оверхедом.
- Для удобства работы с популярными инструментами можно написать «обёртку», которая повысит читаемость кода.
Источник статьи: https://habr.com/ru/company/domclick/blog/557638/