Проблематика
Проекты Каруны, как и многие современные приложения, следуют парадигме микросервисной архитектуры. Поэтому паттерн в настоящее время актуален для нас как никогда. Для начала рассмотрим какой-нибудь простой кейс на примере интернет-магазина. Предположим, что некоторая часть нашей архитектуры разделена на микросервисы: order, который отвечает за создание самого заказа, и сервис goods, который отвечает за создание товаров в этом заказе. Создание заказа можно представить следующей схемой:Клиент обращается непосредственно к сервису order, в котором создаётся сущность заказа. Сервис order создает в своей БД сущность заказа и посылает запрос на создание товаров в этом заказе в сервис goods. Если в сервисе goods произойдет ошибка, то нарушается согласованность данных в нашей системе — заказ уже записан в базу данных сервиса order, но товары не будут записаны в БД сервиса goods.
Методы решения
Многие слышали о так называемых распределённых транзакциях, которые, как предполагается, должны решать описанную выше проблему. Для управления такими транзакциями используется стандарт X/Open XA, следование которому гарантирует как сохранение всей транзакции, так и её откат. Одной из главных проблем распределённых транзакций является то, что их не поддерживают многие современные инструменты — такие как MongoDB, Kafka и т. д.Другая проблема связана с жёстким требованием: 100-процентная доступность всех сервисов во время исполнения такой транзакции. А так как каждый новый сервис понижает доступность системы в целом (доступность системы = произведению доступности каждого сервиса), использование распределённых транзакций будет хуже и хуже сказываться на приложении в целом при масштабировании. По этим причинам использование данного метода в современных приложениях не желательно.
Для решения нашей задачи отлично подходит паттерн saga. Он обеспечивает согласованность данных между сервисами, используя локальные транзакции и асинхронные сообщения. Логика исполнения паттерна размазана на несколько сервисов: после фиксации транзакции в одном сервисе он публикует асинхронное сообщение о завершении транзакции. Это сообщение инициализирует следующий этап выполнения паттерна, и так до конца "повествования".
Для отката изменений на N-ом этапе повествование использует компенсирующие транзакции, которые откатывают изменения и устраняют последствия выполнения предыдущих N-1 транзакций. Выполнение таких транзакций тоже происходит последовательно, но в обратном порядке. Паттерн также оперирует такими понятиями как поворотная транзакция (дальнейшие транзакции никогда не отказывают) и доступная для повторения транзакция (всегда заканчивается успешно).
Управление (координация) повествований может осуществляться 2 способами:
- Хореография — решения о следующих шагах и откатах принимаются распределённо каждым сервисом по отдельности.
- Оркестрация — логика координации централизуется в одном месте, из которого шлются сообщения в другие сервисы.
Проектирование
В нашем случае реализация паттерна будет выглядеть так:Сервис | Транзакция | Компенсирующая транзакция | |
order | CreateOrder() | Доступная для компенсации | RemoveOrder() |
goods | CreateGoods() | Поворотная | - |
order | AcceptOrder() | Доступная для повторения | - |
Рассмотрим две возможных ситуации для нашего функционала . В первом случае клиент посылает запрос в сервис order POST /orders, и всё проходит отлично:
- Успешное создание сущности заказа в сервисе order инициализирует событие order_created_v1, на которое подписан сервис goods.
- После чтения этого события и успешного создания сущностей товаров в сервисе goods посылается событие goods_created_v1 в сервис order.
- При получении этого события сервис order меняет статус заказа в своей БД на approved.
- Снова создаётся заказ в сервисе order, и генерируется событие order_created_v1.
- Сервис goods, получив событие, пытается создать товары для этого заказа. Но что-то идёт не так, и товары не заносятся в БД. Посылается событие goods_rejected_v1 в сервис order.
- При получении этого события сервис order откатывает свои изменения из шага 1 — удаляет заказ.
Реализация
Реализовывать проект будем с помощью golang как языка сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами. Создадим локальное рабочее окружение, а именно — сервисы order, goods, БД для каждого сервиса и брокер kafka:docker-compose.yml
version: '3.9'
services:
order:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: order
environment:
- HTTP_BIND=8080
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
- HOST_DB=db-order
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- ORDER_CREATED_TOPIC=order_created_v1
- ORDER_CREATED_TOPIC=goods_created_v1
- GOODS_REJECTED_TOPIC=goods_rejected_v1
depends_on:
- db-order
- kafka
volumes:
- ./order:/app/order:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
ports:
- "8080:8080"
networks:
- saga
db-order:
image: postgres:14
environment:
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
ports:
- "5441:5432"
volumes:
- data:/var/lib/postgresql
networks:
- saga
goods:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: goods
environment:
- HTTP_BIND=8081
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
- HOST_DB=db-goods
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- ORDER_CREATED_TOPIC=order_created_v1
- GOODS_CREATED_TOPIC=goods_created_v1
- GOODS_REJECTED_TOPIC=goods_rejected_v1
volumes:
- ./goods:/app/goods:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
depends_on:
- db-goods
- kafka
ports:
- "8081:8081"
networks:
- saga
db-goods:
image: postgres:14
environment:
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
ports:
- "5442:5432"
volumes:
- data:/var/lib/postgresql
networks:
- saga
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- '2181:2181'
networks:
- saga
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1,goods_rejected_v1:1:1
networks:
- saga
volumes:
data:
networks:
saga:
Сервис order включает в себя таблицы со статусами заказов и самими заказами:
CREATE TABLE statuses (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL
);
INSERT INTO statuses (id, name) VALUES (1, 'PENDING'), (2, 'CREATED');
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
status_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
FOREIGN KEY (status_id) REFERENCES statuses (id)
);
И реализует эндпоинт создания заказа:
...
server.router.HandleFunc("/v1/orders", s.CreateOrderV1).Methods(http.MethodPost)
...
В котором сохраняется сущность заказа в БД и продьюсится сообщение в kafka:
обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("Data hasn't been read.")
w.WriteHeader(http.StatusBadRequest)
return
}
orderData := model.OrderData{}
err = json.Unmarshal(body, &orderData)
if err != nil {
log.Error().Err(err).Msg("Data hasn't been parsed.")
w.WriteHeader(http.StatusBadRequest)
return
}
var orderID int64
err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, status_id, created_at) VALUES ($1, 1, NOW()) RETURNING id`, orderData.UserID).Scan(&orderID)
if err != nil {
log.Error().Err(err).Msg("Order hasn't been created.")
w.WriteHeader(http.StatusInternalServerError)
return
}
msg := model.CreatedOrderMsg{Data: model.Order{
ID: orderID,
GoodsIds: orderData.GoodsIds,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been marshaled.")
w.WriteHeader(http.StatusInternalServerError)
return
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = s.kafkaProducer.SendMessage(producerMsg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been sent.")
w.WriteHeader(http.StatusInternalServerError)
return
}
}
Cервис слушает топики goods_created_v1 и goods_rejected_v1 и в зависимости от полученного изменяет созданную сущность заказа:
обработка события goods_created_v1
func (gch GoodsCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
gce := GoodsCreatedEvent{}
err := json.Unmarshal(msg.Value, &gce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}
_, err = gch.db.Exec(context.Background(), `UPDATE orders SET status_id = 2 WHERE id = $1`, gce.Data.OrderID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been inserted.")
}
session.MarkMessage(msg, "")
}
return nil
}
обрабокта события goods_rejected_v1
Сервис goods содержит таблицу с заказами:
CREATE TABLE goods (
id BIGSERIAL PRIMARY KEY,
goods_id BIGINT NOT NULL,
order_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
И слушает событие order_created_v1, которое обрабатывает следующим образом:
обработrа события order_created_v1
func (och OrderCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
oce := OrderCreatedEvent{}
err := json.Unmarshal(msg.Value, &oce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}
ctx := context.Background()
tx, err := och.db.Begin(ctx)
for _, goodsID := range oce.Data.GoodsIds {
_, err = och.db.Exec(context.Background(), `INSERT INTO goods (goods_id, order_id, created_at) VALUES ($1, $2, NOW())`, goodsID, oce.Data.ID)
if err != nil {
tx.Rollback(ctx)
log.Error().Err(err).Msg("Inserting error.")
err := och.sendRejected(oce.Data.ID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
continue
}
}
err = tx.Commit(ctx)
if err != nil {
err := tx.Rollback(ctx)
log.Error().Err(err).Msg("Transaction commit error.")
rErr := och.sendRejected(oce.Data.ID)
if rErr != nil {
log.Error().Err(rErr).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
continue
}
err = och.sendCreated(oce.Data.ID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
}
return nil
}
func (och OrderCreatedHandler) sendRejected(orderID int64) error {
msg := model.RejectedGoodsMsg{Data: model.Goods{
OrderID: orderID,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
return err
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_REJECTED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = och.producer.SendMessage(producerMsg)
return err
}
func (och OrderCreatedHandler) sendCreated(orderID int64) error {
msg := model.CreatedGoodsMsg{Data: model.Goods{
OrderID: orderID,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
return err
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = och.producer.SendMessage(producerMsg)
return err
}
Теперь мы может сделать запрос к сервису и создать заказ:
curl --request POST \
--header "Content-Type: application/json" \
--data '{"user_id":1,"goods_ids":[1,2]}' \
'http://localhost:8080/v1/orders'
В успешном варианте развития событий мы получаем полностью сформированную согласованную сущность заказа с соответствующими товарами. В противном случае неудача в сервисе goods влечёт за собой откат транзакции в order. Таким образом мы добились, чего хотели — согласованности данных в распределённой системе. На моем github представлен полный листинг описанной архитектуры. Для реализации повествования на основе оркестрации есть готовая библиотека здесь, заодно можно подробнее ознакомиться с принципами её работы.
Заключение
- Паттерн saga хорошо решает проблему согласованности в распределённых системах сервисов.
- Одна из главных проблем повествований — что они являются ACD. У них нет изолированности, это вызывает аномалии (по аналогии с аномалиями в СУБД). Одни повествования могут влиять на данные, с которыми работают другие повествования. Для компенсации этого недостатка паттерн должен реализовывать контрмеры.
- Если сравнивать повествование на основе оркестрации и хореографии, то второй метод проще в реализации, сложнее для понимания и подходит лучше для простых кейсов. Среди явных недостатков можно выделить возможность возникновения жёсткого связывания, т.к каждый сервис подписывается на все события, которые на него влияют.
- Повествование на основе оркестрации не создает циклических зависимостей, и бизнес-логика значительно проще, чем в хореографии. Меньше связывания — у каждого сервиса своё API, которое вызывается оркестратором. Лучше подходит для сложных повествований.
SAGA на golang
После того, как я написал статью про паттерн CQRS , мне захотелось описать ещё один интересный шаблон для микросервисной архитектуры, а именно saga (он же повествование ). Проблематика Проекты Каруны,...
habr.com