Отправка доменных событий в модульном монолите

Kate

Administrator
Команда форума
Статья написана по мотивам моего доклада на митапе.

Всем привет! В этой статье хочу поделиться опытом построения системы доменных событий (domain events) в нашем модульном монолите и микросервисах, рассказать о том, как мы гарантируем их доставку, следим за консистентностью в рамках транзакций, используя transactional outbox, чем доменные события отличаются от интеграционных и всё это в рамках multi tenant приложения.

История началась с того, что от бизнеса пришёл запрос на доработку одной из ключевых функций нашего приложения. Поскольку данный функционал в той или иной степени касался почти всех компонентов приложения, мы решили немного улучшить наш большой легаси проект и переписать часть с использованием нового фреймворка, новых подходов и новой архитектуры. Также важной частью запроса от бизнеса было увеличение количества команд разработки, одновременно работающих над разными частями продукта. Мы решили разделить наш монолит на модули используя принципы DDD, а также часть логики вынести в микросервисы на Go. В рамках модулей и микросервисов код было решено поделить их на слои в соответствии с гексагональной архитектурой (подробнее можно почитать тут).

Данная статья нацелена на мидлов и сеньоров. Для джунов рекомендую сначала посмотреть в сторону этих двух книжек:

5a1f5440c4b2a973d295a74e4126089c.png

Давайте разберёмся, что такое ограниченный контекст (далее просто контекст). Контекст — это часть большого контекста предметной области, самодостаточная, со своими определениями, языком и доменной моделью. В рамках модульного монолита приложение можно разделить на модули в соответствии с контекстами, в микросервисной архитектуре - на микросервисы. Именно изменения доменной модели внутри контекста генерят события.

О доменных событиях нужно знать следующее:

  • они обозначают события, произошедшие в прошлом;
  • обрабатываются в текущем контексте;
  • нужны для синхронизации нескольких агрегатов в контексте (то есть каждый агрегат предметной модели сам отвечает за свое консистентное состояние, но если у нас в контексте их несколько, то мы их должны синхронизировать и это делается с помощью доменных событий);
  • не идут в брокер сообщений;
  • обрабатываются синхронно.
В контекстe User могут быть следующие доменные события: пользователь добавлен в систему, пользователь добавлен в группу, у пользователя изменился пароль и так далее.

Теперь перейдём к тому, как мы реализовали схему диспатчинга. В контексте User есть агрегат User. Предположим, у него изменилось имя и сгенерировалось доменное событие, которое отправилось в Event Dispatcher.

29cd1bea7252e525232cb1b1536c6754.png

Чтобы не нарушать чистую архитектуру, интерфейс Event Dispatcher Interface находится на уровне домена, а реализация лежит на уровне приложения. В реализацию попадает объект события, который передаётся всем подписчикам. Event Handler’ы вызывают нужные методы других агрегатов. Это и есть синхронизация двух агрегатов.

Перейдём к интеграционным событиям. Интеграционные события — это события, порождённые другим контекстом и преобразованные в формат, удобный для транспортировки. Они обрабатываются в других контекстах асинхронно. Именно они идут через брокер сообщений.

44d6e9ac736f21d912e1ddebe4434fa3.png

Разберём по схеме. Доменное событие через доменный Event Dispatcher попадает в Domain Event Handler, находящийся на уровне инфраструктуры. Этот Handler преобразовывает объект доменного события в сообщение (в нашем случае в JSON строку) и отправляет в брокер. После этого сообщение попадает в Message Handler в другом контексте. Данное сообщение, пришедшее из брокера и преобразованное в объект Integration Event, и есть интеграционное событие. Для обработки такого события используется Integration Event Handler, поскольку событие не из текущего контекста.


Давайте рассмотрим типичные ошибки, с которыми мы столкнулись во время реализации системы доменных событий.

  1. Синхронная обработка событий в другом контексте.
    Идея многомодульного монолита такова, что каждый модуль можно легко вынести в отдельный микросервис, потому что это маленькая предметная область взаимодействующая с другими контекстами через своё API. Если другой модуль подписывается на события синхронно, то он делает это в обход API и его уже не так легко вынести в микросервис. События же являются частью API модуля/микросервиса (хороший пример — https://launchany.com/microservice-design-canvas/). Дополнительные сложности могут принести синхронные транзакции в нескольких модулях.
  2. Событие не доменное, а уровня приложения.
    Разберём на примере — при добавлении пользователя в систему сам способ добавления (через форму или импорт файла) для предметной области может ничего не значить. Событие «завершен импорт файла» или «изменился прогресс выполнения задачи» чаще всего уровня приложения, а не доменного. Подобные ошибки мы отлавливали на Code Review.
  3. События проектируются с учетом бизнес логики потребителей.
    Со стороны внешнего сервиса бывает нужно только одно конкретное событие, но с точки зрения нашего контекста, генерирующего событие, оно общее. В таком случае правильным решением будет сделать общее событие, чтобы часть логики внешнего сервиса не проникла в наш контекст. Например, имеем список групп, который может редактироваться и пополняться через интерфейс приложения. Пользователь может быть добавлен в любую группу — группу администраторов, группу комментаторов и тд. Казалось бы это все одно и то же - «пользователь добавлен в группу», но с точки зрения внешнего контекста может быть важно лишь одно событие «пользователь добавлен в группу администраторов». Если мы будем отправлять событие добавления в конкретную группу администраторов, то часть бизнес логики внешнего контекста окажется в нашем контексте.
Ещё есть интересные задачи связанные с событиями:

  1. Массовые и единичные события.
    Например, добавили одним действием одного пользователя или тысячу пользователей. Сколько событий нужно сгенерировать: одно с тысячей пользователей или тысячу по одному?
  2. Количество и набор данных в событии.
    Когда событие обрабатывается в другом контексте, бывает нужно узнать что-то дополнительно из исходного контекста, например, сколько всего пользователей в группе администраторов. Все данные класть в событие бывает накладно, также как и не класть никаких данных вообще. Потому каждый раз необходимо решать, что же положить в конкретное событие.
  3. Идемпотентность обработки события.
    Брокер сообщений может гарантировать at least once доставку, то есть одно сообщение будет доставлено как минимум один раз. Это значит, что оно может быть доставлено и больше одного раза. Когда мы только написали свою систему событий, у нас одно событие бывало приходило и десяток раз. И если во время обработки события мы, например, к счетчику прибавляем два, а потом вновь пришло это событие и мы еще раз добавили два, то в итоге получили +4, а должны были получить только +2. За этим нужно следить.
Каждая из этих тем интересная и большая, подробнее их в статье мы рассматривать не будем. В целом стоит помнить, цитата из книги «Fundamentals of Software Architecture: An Engineering Approach by Mark Richards and Neal Ford»:

Everything in software architecture is a trade-off (First Law of Software Architecture)

Теперь подробнее про диспатчинг.

Мы уже разобрали, что такое доменное событие и понимаем, какие данные в это событие нужно положить. Пришло время его диспатчить. Казалось бы — диспатчер вызвал и диспатчь, как в примере выше :) На самом деле это не так просто сделать.

Существует несколько подходов (вариантов диспатчинга):

1) Статический диспатчер, который используется напрямую из доменной модели

Плюсы: его легко реализовывать.

Минусы:

  • сложнее покрыть и проверять тестами;
  • события диспатчатся немедленно. Мы еще не завершили операцию, агрегат ещё что-то не доделал, а событие уже задиспатчилось. За этим нужно следить.
Например, переименование пользователя — в агрегате User в методе Rename сначала выполняется бизнес логика (проверяются инварианты, что имя корректно), а после этого статическим методом диспатчится доменное событие. Всё просто, но я бы посоветовал всё продумать, прежде чем так делать.

namespace User\Domain;

class User
{
public function rename(Name $name): void
{
$this->name = $name;
Dispatcher::dispatch(new UserRenamed($this->getId(), $name));
}
}
2) Агрегат коллекционирует все свои события (более популярный)

Плюсы:

  • легко реализовывать обработку событий после сохранения агрегата. Сначала что-то сделали с агрегатом, он сохранил все события и только после этого мы обработали все события.
Минусы:

  • лишние методы и данные у агрегата. В нашу предметную модель попали методы, которые не совсем относятся к предметной модели.
По этой теме существует компонент Messenger.

В агрегате user появляется массив с событиями. Когда выполняется операция «rename», в этот массив сохраняются все доменные события и появляется метод «getEvents».

namespace User\Domain;

class User
{
private $events = [];

public function rename(Name $name): void
{
$this->name = $name;
$this->events[] = new UserRenamed($this->getId(), $name);
}

public function getEvents(): array
{
return $this->events;
}
}
3) События диспатчит доменный сервис.

Плюсы:

  • нет лишних методов и зависимостей у агрегатов, модель становится чистой;
  • можно использовать анемичную или частично анемичную доменную модель. DDD рекомендует использовать богатую доменную модель (когда агрегат сам следит за своими инвариантами и не имеет сеттеров). Анемичная модель — это модель для хранения данных, за её инварианты отвечают внешние сервисы, а не она сама. Типичная анемичная модель имеет только поля и наборы сеттеров и геттеров. В данном случае, доменный сервис следит, чтобы инварианты доменной анемичной модели соблюдались внутри нашего уровня домена. Анемичная модель легка в реализации, но в рамках нашего решения не является обязательной.
Минусы:

  • лишняя обёртка, то есть каждое действие с агрегатом выполняется только через обертку и нельзя напрямую работать с агрегатом из приложения.
Обычно доменный сервис — это редкий кейс, используется в случаях, когда какая-то логика не относится к конкретному агрегату. Хорошим примером доменного сервиса по DDD является спецификация. Это доменная логика и она относится к предметной модели и области, но не имеет своего состояния и выполняется над агрегатом или над коллекцией агрегатов. Фактически, это сервис, который мы положили в домен.

namespace User\Domain;

class UserService
{
public function __construct(DomainEventDispatcherInterface $dispatcher)
{
$this->dispatcher = $dispatcher;
}

public function rename(User $user, Name $name): void
{
$user->rename($name);
$this->dispatcher->dispatch(new UserRenamed($user->getId(), $name));
}
}
Именно третий вариант мы выбрали у себя.

Стоит уточнить, что когда у нас разрабатывается CRUD сервис, то для него далеко не всегда нужен DDD. Есть сложные контексты, где мы используем только DDD. Но также есть простые контексты, где можно использовать анемичную модель, которая может лежать на уровне приложения и события там будут только уровня приложения.


И так, всё классно, мы задиспатчили событие и тут возникает следующий вопрос. У нас же есть транзакции и брокер сообщений и нам нужно в какой-то момент преобразовать доменное событие в сообщение и отправить его в наш брокер. Как это сделать, чтобы ничего не сломалось? Есть несколько вариантов.

Отправка событий и транзакции

Варианты отправки событий в брокер сообщений:

  1. До закрытия транзакции.
    Тогда у нас события преобразуются в сообщения и отправляются в брокер до того как мы закрыли транзакцию. Очевидные минусы — если с брокером соединение упало, то у нас падает весь кейс и надо откатить всю транзакцию, то есть фактически доменная логика зависит от брокера. Также событие может быть обработано внешним сервисом до того, как транзакция будет закрыта, что может привести к потере консистентности. Или транзакция может быть в целом отменена, а событие уже отправлено в брокер, тогда у нас данные могут в контекстах совсем разъехаться.
  2. После закрытия транзакции.
    Здесь тоже минус — если мы доменное событие диспатчим после закрытия транзакции, то есть вероятность, что возникнет ошибка при отправке, а транзакция закроется. Так можно потерять событие навсегда.
  3. Сохранение события в event store в хранилище событий в той же самой транзакции, в которой сохраняются наши модели, и отправка в брокер после транзакции. Самый хороший вариант, известный как transactional outbox. Именно этот вариант мы реализовали у себя.
Рассмотрим третий вариант на простой схеме.

64b2c9aa1dab0c5a337ea71aa3ddbf0b.png

В данном случае алгоритм обработки доменного события:

  1. После изменения модели User генерится доменное событие (в нашем случае в доменном сервисе)
  2. Это доменное событие из Event Dispatcher’a синхронно попадает в Event Handler’ы
  3. Первый Event Handler синхронно вызывает методы модели Group
  4. Второй Event Handler синхронно передаёт доменное событие в Event Store
  5. Это событие преобразуется в stored event в удобном виде для хранения в БД
  6. Дальше происходит закрытие транзакции и в БД сохраняются изменения моделей User, Group и модели Stored Event
  7. Stored Event Listener на событие коммита транзакции в базу выбирает все новые Stored Event’ы и отправляет в брокер сообщений
Таким образом мы сохраняем события в базу в одной транзакции с основной моделью, что гарантирует, что события не будут потеряны.

7ой пункт у нас в таком виде реализован для PHP монолита. В БД есть отдельная таблица, хранящая ID последнего отправленного сообщения. При отправке считываются все неотправленные сообщения, при этом с блокировкой, чтобы не слать одно сообщение много раз. В рамках микросервисов на go мы не подписываемся на событие коммита, а проверяем с заданным интервалом наличие новых сообщений в горутине.

Подробнее про transactional outbox можно почитать в книге Chris Richardson «Microservices Patterns».


Следующий нюанс нашего приложения в том, что оно multi tenant. Клиентами для нас являются организации, для каждой из которых организована отдельная «песочница» с отдельной базой и отдельным набором пользователей и других сущностей внутри.

ce312026300c520d809665a3f99ca2ea.png

Один из интересных моментов, с которым мы столкнулись — один tenant мог нагенерить так много событий, что их обработка могла занять минуты, а иногда и часы. Ясно, что нам необходимо было работать над уменьшением времени обработки отдельного события. Но в рамках общей архитектуры такие наплывы всё равно могли произойти и мы решили сначала разобраться с основной проблемой - не дать наплыву событий из одного tenant повлиять время обработки событий из других. В multi tenant модели нам надо обрабатывать события последовательно в рамках одного tenant и параллельно для разных tenant, а также иметь регулируемое ограничение на количество одновременно обрабатываемых событий.

Мы искали, что есть в мире, ничего не нашли и написали свой маленький сервис, который является прослойкой до подписчиков.

4356a9f24dd805604ee7118352f7b9e8.png

У каждого tenant своя очередь событий в event store, которые попадают в брокер, и есть сервис, который на эти события подписан (в рамках монолитной архитектуры генерить и подписываться может один и тот же монолит). В изначальной реализации сервис сам подписывался на брокер и обрабатывал события.

В текущей нашей реализации появилась прослойка, которая сама подписывается на брокер, сохраняет все сообщения себе локально в хранилище и говорит брокеру, что сообщение обработано. Дальше этот сервис сам следит за всеми очередями в своей базе и для каждого tenant отправляет сообщение последовательно, потому что нам нужно гарантировать последовательность обработки событий. При этом для всех tenant оно отправляет параллельно. Мы ограничиваем и пишем в настройках, сколько нужно параллельно отправлять сообщений. Например, если установить лимит в 10, то только для 10 tenant будут параллельно отправляться сообщение.

a99b56c244d83bb42c25e47221c01ce6.png

Использование данного message limiter не является обязательным при подключении новых сервисов к брокеру, но если возникают инциденты, описанные выше, мы рекомендуем его использовать командам. Модульный монолит использует message limiter по умолчанию для всех модулей.

В итоге после реализации описанных выше решений мы:

  • не теряем события (они сохраняются в одной транзакции с моделью);
  • имеем единую шину событий на несколько контекстов и сервисов;
  • обрабатываем события в multi tenant модели.
Полезные ссылки:


Источник статьи: https://habr.com/ru/company/ispring/blog/569648/
 
Сверху