Пишем обёртку над SQLAlchemy Сore

Kate

Administrator
Команда форума
Для асинхронного Python существует мало полноценных ORM, и им далеко до таких монстров-комбайнов, как DjangoOrm и SQLAlchemy.ORM. Бедность ORM-инструментария для асинхронного программирования заставила многих программистов отказаться от зачастую непонятной им работы с ORM и перейти к более прозрачному взаимодействию с БД. Решение в лоб — написание raw SQL, но в этом случае запросы не будут защищены от инъекций, а запросы, составляемые по бизнес логике с опциональными параметрами, превратятся в конкатенацию строк. Важно найти баланс между прозрачностью выполнения кода, скоростью его написания и читаемостью.

Ниже я предлагаю реализацию такого баланса c использованием SQLAlchemy Core.

Сравнение с остальными ORM​

6b185a52b48d3b8db562f5a33e07cc38.png

Прежде чем рассматривать работу SQLAlchemy Core, для расширения инженерного кругозора давайте познакомимся с альтернативными решениями и их недостатками.

  • Django ORM и ponyORM не подходят из-за невозможности работать в асинхронном режиме.
  • SQLAlchemy.ORM: поддержку полноценной асинхронной ORM в альфа-режиме добавили в версии 1.4, пока не подходит для production-решений.
  • Peewee: умеет работать в асинхронном режиме, но только с Core-функциями. Хорошая библиотека, но я предлагаю выбрать SQLAlchemy из-за её большей популярности и поддерживаемости. Также были жалобы на ленивую загрузку объектов в Peewee — иногда она намертво вешает цикл (возможно, эту ошибку уже поправили в новых версиях).
  • Gino: простая ORM на основе SQLAlchemy Core. Работать с этой библиотекой можно без знаний об SQLAlchemy.
  • tortoise-orm: молодой проект с похожим на Django ORM способом доступа к данным.

Работа с SQLAlchemy Core​

0896588bc2303e566444e9ba362b4278.png

Алхимия состоит из двух частей. Первая — это абстракция над SQL-базой данных, которая называется SQLAlchemy Сore. Вторая — это ORM, собственно mapping между реляционной БД и объектным представлением. Соответственно, SQLAlchemy Сore почти один к одному совпадает с SQL — если вы знаете последний, то проблем с Core, как правило, не возникает. Благодаря этому использование SQLAlchemy Сore имеет наименьший оверхэд при обращении к БД.

SQLAlchemy Core предоставляет всю основную функциональность для построения запросов. Но в крупном проекте при обращении к базовому API SQLAlchemy будет много дублирующихся участков, поэтому логично написать библиотеку-обёртку для реализации наиболее популярных сценариев использования.

Примеры основных запросов к Алхимии:

tbl = cls.__table__
select_sql = select([tbl]).where(tbl.c.user_type == user_type)
cursor = await conn.execute(select_sql)
rows = await cursor.fetchall()
tbl = cls.__table__
insert_sql = tbl.insert().values(val='abc').returning(*tbl.c)
cursor = await conn.execute(insert_sql)
row = await cursor.fetchone()
Как видите, участки кода с передачей параметров повторяются, а сам код содержит много параметров, связанных с внутренней реализацией SQLAlchemy Core, что ухудшает читаемость. Хотелось бы получать данные из БД с помощью таких запросов:

user = await User.select().where(User.user_type == user_type).get(conn)
_ = await User.insert().values(val='abc').execute(conn)
Чтобы этого добиться, необходимо реализовать обработчики для операций insert, select, update и delete.

Пишем «обёртку»​

2ab4152da9b147eb1ef72dd7eb8f553d.png

Напишем для функции select полноценную обёртку, которая даст возможность с минимумом кода использовать все виды join, сортировки и остальные часто используемые операции:

class SelectQuery:
def __init__(self, model, fields):
self._model = model
self._from = None
_fields = []
self._relations = {field.class_ if isinstance(field, QueryableAttribute) else field for field in fields}
if model in self._relations:
self._relations.remove(model)
for field in fields:
if isinstance(field, QueryableAttribute):
_fields.append(getattr(field.class_.__table__.c, field.key))
elif isinstance(field, sa.Column):
_fields.append(field)
elif hasattr(field, '__table__'):
for f in field.__table__.c:
_fields.append(f)
else:
_fields.append(field)

self._stmt = sa.select([model, *self._relations]).with_only_columns(_fields)

def join(self, right, on):
if self._from is None:
self._from = sa.join(self._model, right, on)
else:
self._from = self._from.join(right, on)
return self

def outerjoin(self, right, on):
if self._from is None:
self._from = sa.outerjoin(self._model, right, on)
else:
self._from = self._from.outerjoin(right, on)
return self

def where(self, condition):
self._stmt = self._stmt.where(condition)
return self

def group_by(self, column):
self._stmt = self._stmt.group_by(column)
return self

def order_by(self, *fields):
self._stmt = self._stmt.order_by(*fields)
return self

def distinct(self, *fields):
self._stmt = self._stmt.distinct()
return self

def limit(self, limit):
self._stmt = self._stmt.limit(limit)
return self

def offset(self, offset):
self._stmt = self._stmt.offset(offset)
return self

async def get(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)

result = await conn.execute(self._stmt)
result = await result.first()
return result

async def all(self, conn):
if self._from is not None:
self._stmt = self._stmt.select_from(self._from)

result = await conn.execute(self._stmt)
return await result.fetchall()

def get_query(self):
if self._from is not None:
return self._stmt.select_from(self._from)
return self._stmt

@property
def raw_sql(self):
stmt = self._stmt
if self._from is not None:
stmt = stmt.select_from(self._from)
return stmt.compile(compile_kwargs={'literal_binds': True})

def _create_model(self, model_class, data):
model_fields = {
getattr(field, 'name'): data[str(field)] for field in model_class.__table__.c if str(field) in data
}
model = model_class(**model_fields)
for relation in model_class.__mapper__.relationships:
if relation.mapper.class_ in self._relations:
relation_model = self._create_model(relation.mapper.class_, data)
setattr(model, relation.key, relation_model)

return model
Также специфическими операциями обладают запросы на удаление:

class DeleteQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt

def where(self, condition):
self._stmt = self._stmt.where(condition)
return self

def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self

async def execute(self, conn):
return await conn.execute(self._stmt)

def get_query(self):
return self._stmt
Для insert и update подойдёт одинаковый набор операций:

class StatementQuery:
def __init__(self, model, stmt):
self._model = model
self._stmt = stmt
self._values = None

def values(self, **values):
self._values = self._filter_values(values)
return self

def where(self, condition):
self._stmt = self._stmt.where(condition)
return self

def returning(self, *cols):
self._stmt = self._stmt.returning(*cols)
return self

async def execute(self, conn):
self._values = self._filter_values(self._values)
if not self._values:
return
self._stmt = self._stmt.values(self._values)
result = await conn.execute(self._stmt)
return result

def get_query(self):
values = self._filter_values(self._values)
return self._stmt.values(values)

def _filter_values(self, values):
return {
f.key: values[f.key] for f in self._model.__table__.c if f.key in values
}

Обернём описанный выше код в класс, от которого можно унаследовать модели SQLAlchemy:

class DB:
@classmethod
def select(cls, fields=['*']):
return SelectQuery(cls, fields)

@classmethod
def update(cls):
return StatementQuery(cls, sa.update(cls))

@classmethod
def insert(cls):
return StatementQuery(cls, sa.insert(cls))

@classmethod
def delete(cls):
return DeleteQuery(cls, sa.delete(cls))

@staticmethod
def before_save(data):
pass

@staticmethod
def after_save(data):
pass

@classmethod
def model_fields(cls):
return list(filter(lambda x: not x.startswith('_'), dir(cls)))

Примеры использования:

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base, DB):
__tablename__ = 'user'

id = Column(Integer, primary_key=True)
login = Column(String(255))
active = Column(Boolean, default=True)

@staticmethod
async def get_by_login(login, conn):
user = await User.select() \
.where(User.login == login) \
.get(conn)
return user

@staticmethod
async def get_all_active(conn):
users = await User.select() \
.where(User.active.is_(True)) \
.all(conn)
return users

@staticmethod
async def create_user(login, conn):
_ = await User.insert().values(
login=login
).execute(conn)
return True

@staticmethod
async def delete_user(login, conn):
_ = await User.update().values(
active=False
).execute(conn)
return True

Выводы​

da289ea4617ee58e7d7387d0a9f40fee.png

  • Большинство существующих ORM на Python не позволяют работать с БД в асинхронном режиме.
  • При работе с SQLAlchemy Core генерируется понятный SQL с наименьшим оверхедом.
  • Для удобства работы с популярными инструментами можно написать «обёртку», которая повысит читаемость кода.


Источник статьи: https://habr.com/ru/company/domclick/blog/557638/
 
Сверху