Решаем задачу по взаимодействию микросервисов на Python тремя способами

Kate

Administrator
Команда форума
z7q7seuji3tleuljagf9r3hernc.png



Когда речь заходит про взаимодействие микросервисов, все сразу вспоминают о сложных архитектурных паттернах, вроде Event Bus и CQRS. В этой статье я расскажу, как выполнить простенькую задачку для двух микросервисов без навороченной архитектуры. В моем случае это создание сервиса, который агрегирует события компании в единую ленту событий.

Задача​


Дано два сервиса:

  • регистратор сотрудников,
  • лента событий компании.

Цель: сделать так, чтобы в ленте создавалось событие при регистрации нового сотрудника.

С подобной задачей я столкнулся при разработке корпоративного портала Selectel. Мне нужно было организовать отображение в ленте целого ряда новых событий — в их числе изменение должности или структуры команд, переименование отдела, реакция на событие (лайк или огонек).

В моем случае я сразу пришел к одному из озвученных в тексте решений.

Решение 1. Синхронное взаимодействие​


Чтобы решить задачу, будем действовать постепенно. Для начала рассмотрим самый простой для нас вариант, в котором регистратор посылает POST-запрос на создание события в ленте.

wxrdzjqmb6svnc6outwn2cv7l4k.png


Подойдет ли нам этот подход? Зависит от того, что происходит в регистраторе. Тут нужно иметь в виду, что каждое новое действие, добавленное в метод регистрации, замедляет его выполнение. Не хочется заставлять сотрудников ждать на старте их карьеры в компании. К тому же, метод будет замедляться при добавлении новых действий. Например, регистрации в разных социальных аккаунтах.

Такое взаимодействие называется синхронным. Его лучше избегать, и сделать это довольно просто. Нужно внедрить асинхронный подход!


Решение 2. Асинхронный подход​


Если попытаться нагуглить «асинхронные фоновые задачи в Python», скорее всего, наткнешься на такие решения, как Dramatiq или Celery. Они довольно популярны, и не просто так. С их помощью можно быстро накодить асинхронное взаимодействие для наших сервисов, нужен только брокер сообщений.

Брокер сообщений — это отдельный сервис, который передает сообщения в один или несколько пунктов назначения. В нашем случае он нужен для того, чтобы передать команду «Отправь задачу на создание события с параметрами a и b» в Worker. Обычно брокером выступает Redis, RabbitMQ или Kafka. Хотя бывают и случаи, когда после долгих и упорных исследований используют YMQ.

Worker — это отдельная программка, которая выполняет полученные из брокера задачи. В нашем случае именно Worker будет отправлять запрос на создание события.

После внедрения изменений наша архитектура станет похожей на диаграмму ниже.

miulq6vooqdfut-ze0ontswukum.png


Регистратор посылает задачу на публикацию события в брокер RabbitMQ. В то же время на этот брокер подписан Worker, который ожидает задач от брокера. После получения задачи на публикацию события Worker посылает HTTP-запрос в сервис ленты.

Такое взаимодействие выглядит и работает намного лучше, чем первое, потому что регистратор не зависит от количества действий при регистрации, а сотрудник не ожидает их выполнения.

Можно остановиться и на этой реализации. Но в реальности, где нежданно-негаданно меняются требования, наше решение по-прежнему не доведено до ума.

А что если сервисов станет больше? Отправление событий по HTTP​


Представим, что мы выкатили решение в прод. События при регистрации появляются в ленте, все довольны, всем все понравилось. Настолько понравилось, что сотрудники подумали и попросили: «Слушайте, а давайте добавим еще событий в ленту? Например, будем отправлять сотрудникам благодарности или информацию об изменениях в структуре компании».

Что случится с нашей архитектурой, если отправлять сообщения в ленту будет не только регистратор, но и еще несколько сервисов?

jivlibyfalfvdov0cprpdgyn_e4.png


В этом случае каждый сервис будет посылать в ленту события по HTTP, а это будет необоснованно нагружать сервис. К тому же, держим в голове, что, помимо получения событий, сервису ленты нужно их еще и показывать по запросу на главной странице портала.

В принципе можно оставить все как было. Но такой подход будет хорошо работать до тех пор, пока:

  • мало событий,
  • мало сервисов, которые шлют события,
  • мало сотрудников.

Решение 3. Очередь RabbitMQ​


Появляется вопрос: «Можно ли как-то избежать лишней загрузки сервиса?» Ответ: «Можно, если убрать HTTP из нашей схемы».

Действительно, зачем нам нужно отправлять запросы из Worker-ов по HTTP, когда мы можем сразу из сервисов отправлять задачи через брокер в Worker, который имеет доступ к БД ленты. А он будет эти события напрямую записывать.

После преобразований наша схема станет такой:

z7q7seuji3tleuljagf9r3hernc.png


Как это реализовать? Создаем очередь (RabbitMQ) специально для нашего сервиса событий. Из каждого сервиса мы публикуем события в эту очередь. Отдельный Worker, в свою очередь, выполняет задачи из брокера. Если говорить терминами, то наши сервисы — это publisher-ы, а Worker ленты — это consumer.

На этом месте уже можно остановиться. Подход хороший, все работает классно, и теперь можно не беспокоиться за новые требования наших сотрудников. Осталось только понять, как все это реализовать на Python.

Реализация на Python​


Раз сервисов много, будет логично сделать Worker ленты асинхронным, так как у него будет много простоев из-за ожидания ввода/вывода.

Для его реализации будем использовать библиотеку aio-pika.

async def main():
try:
connection = await aio_pika.connect_robust(settings.rabbitmq_dsn) # соединение с RabbitMQ
except exceptions.CONNECTION_EXCEPTIONS as e:
logger.error(e.args[0])
await asyncio.sleep(3)
return await main() # запускаем бесконечный цикл, пока не подключится
async with connection:
channel: aio_pika.abc.AbstractChannel = await connection.channel()
queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
settings.queue_name, durable=True
)
logger.info("Starting consuming")
while True:
try:
await consume(queue) # начинаем слушать очередь
except exceptions.CONNECTION_EXCEPTIONS as e:
logger.error(e.args[0])
return await main()
except Exception as e:
logger.error(e.args[0])




if __name__ == "__main__":
logger.info("Starting queue worker")
asyncio.run(main(), debug=settings.app_env == AppEnvEnum.local.value)


Функция consume тоже очень простая.

async def consume(queue):
message: aio_pika.IncomingMessage
async for message in queue:
async with message.process():
context = {
"service_name": message.app_id,
"task_id": message.message_id,
}
with logger.contextualize(**context):
logger.info("message is being processing")
data = json.loads(message.body.decode())
await create_new_events(data) # тут ваша бизнес логика
logger.info("message successfully processed!")


С consumer-ом разобрались, а что с publisher-ами? Сейчас в большинстве компаний существуют как синхронные, так и асинхронные сервисы, поэтому приведем пример и того, и другого.

Для синхронных сервисов будем использовать модуль pika.

def _get_message_properties(message_id: Optional[str] = None):
return pika.BasicProperties(
delivery_mode=DeliveryMode.Persistent.value,
content_type="application/json",
content_encoding="utf-8",
message_id=message_id or uuid4().hex,
app_id=project.config.application_name,
)




def _create_connection():
parsed = urlparse(_events_feed_config["queue"]["dsn"])
credentials = pika.PlainCredentials(
username=parsed.username, password=parsed.password
)
param = pika.ConnectionParameters(
host=parsed.hostname,
port=parsed.port,
virtual_host=parsed.path[1:],
credentials=credentials,
)
return pika.BlockingConnection(param)




def publish_to_events_feed(data, message_id):
with _create_connection() as connection:
channel = connection.channel()
properties = _get_message_properties(message_id)
logger.info("message is publishing")
channel.basic_publish(
exchange="",
routing_key=_events_feed_config["queue"]["name"],
body=json.dumps(data).encode(),
properties=properties,
)
logger.info("message successfully published")


А для асинхронных по-прежнему aio-pika.

def _create_message(data: bytes, message_id: Optional[str] = None):
return aio_pika.Message(
body=data,
content_type="application/json",
content_encoding="utf-8",
message_id=message_id or uuid4().hex,
delivery_mode=aio_pika.abc.DeliveryMode.PERSISTENT,
app_id=config.app_name,
)




async def _publish_events(data: EventListModel, message_id=None):
connection = await aio_pika.connect_robust(config.events_feed_queue_dsn)
async with connection:
routing_key = config.events_feed_queue_name
channel: aio_pika.abc.AbstractChannel = await connection.channel()
message = _create_message(data.json().encode(), message_id)
logger.info("message is publishing")
await channel.default_exchange.publish(
message,
routing_key=routing_key,
)
logger.info("message successfully published")


Заключение​


Когда вы решаете задачу, исходите из требований. Зачастую не нужно строить сложную архитектуру — достаточно просто сделать POST-запрос. Когда же ваши сервисы начинают глючить, подвисать либо вы заранее знаете, что нагрузка будет высокой, имеет смысл выстроить хорошее решение для пользователей.

 
Сверху