SAGA на golang

Kate

Administrator
Команда форума

Проблематика​

Проекты Каруны, как и многие современные приложения, следуют парадигме микросервисной архитектуры. Поэтому паттерн в настоящее время актуален для нас как никогда. Для начала рассмотрим какой-нибудь простой кейс на примере интернет-магазина. Предположим, что некоторая часть нашей архитектуры разделена на микросервисы: order, который отвечает за создание самого заказа, и сервис goods, который отвечает за создание товаров в этом заказе. Создание заказа можно представить следующей схемой:

cc57da3808c748515ee9b5d8789db5e3.jpg

Клиент обращается непосредственно к сервису order, в котором создаётся сущность заказа. Сервис order создает в своей БД сущность заказа и посылает запрос на создание товаров в этом заказе в сервис goods. Если в сервисе goods произойдет ошибка, то нарушается согласованность данных в нашей системе — заказ уже записан в базу данных сервиса order, но товары не будут записаны в БД сервиса goods.

Методы решения​

Многие слышали о так называемых распределённых транзакциях, которые, как предполагается, должны решать описанную выше проблему. Для управления такими транзакциями используется стандарт X/Open XA, следование которому гарантирует как сохранение всей транзакции, так и её откат. Одной из главных проблем распределённых транзакций является то, что их не поддерживают многие современные инструменты — такие как MongoDB, Kafka и т. д.

Другая проблема связана с жёстким требованием: 100-процентная доступность всех сервисов во время исполнения такой транзакции. А так как каждый новый сервис понижает доступность системы в целом (доступность системы = произведению доступности каждого сервиса), использование распределённых транзакций будет хуже и хуже сказываться на приложении в целом при масштабировании. По этим причинам использование данного метода в современных приложениях не желательно.

Для решения нашей задачи отлично подходит паттерн saga. Он обеспечивает согласованность данных между сервисами, используя локальные транзакции и асинхронные сообщения. Логика исполнения паттерна размазана на несколько сервисов: после фиксации транзакции в одном сервисе он публикует асинхронное сообщение о завершении транзакции. Это сообщение инициализирует следующий этап выполнения паттерна, и так до конца "повествования".

Для отката изменений на N-ом этапе повествование использует компенсирующие транзакции, которые откатывают изменения и устраняют последствия выполнения предыдущих N-1 транзакций. Выполнение таких транзакций тоже происходит последовательно, но в обратном порядке. Паттерн также оперирует такими понятиями как поворотная транзакция (дальнейшие транзакции никогда не отказывают) и доступная для повторения транзакция (всегда заканчивается успешно).

Управление (координация) повествований может осуществляться 2 способами:

  • Хореография — решения о следующих шагах и откатах принимаются распределённо каждым сервисом по отдельности.
  • Оркестрация — логика координации централизуется в одном месте, из которого шлются сообщения в другие сервисы.

Проектирование​

В нашем случае реализация паттерна будет выглядеть так:

Сервис​
Транзакция​
Компенсирующая транзакция​
orderCreateOrder()Доступная для компенсацииRemoveOrder()
goodsCreateGoods()Поворотная-
orderAcceptOrder()Доступная для повторения-
Реализуем saga паттерн, используя хореографию. Она проще в реализации, чем оркестрация, и хорошо ложится на наш случай небольшого количества сервисов. Применительно к описанной задаче, повествование можно представить следующей схемой:

b6ec24dcb03e9d58c074582b152cf131.jpg

Рассмотрим две возможных ситуации для нашего функционала . В первом случае клиент посылает запрос в сервис order POST /orders, и всё проходит отлично:

  1. Успешное создание сущности заказа в сервисе order инициализирует событие order_created_v1, на которое подписан сервис goods.
  2. После чтения этого события и успешного создания сущностей товаров в сервисе goods посылается событие goods_created_v1 в сервис order.
  3. При получении этого события сервис order меняет статус заказа в своей БД на approved.
Другой случай, это когда во время заказа что-то пошло не так:

  1. Снова создаётся заказ в сервисе order, и генерируется событие order_created_v1.
  2. Сервис goods, получив событие, пытается создать товары для этого заказа. Но что-то идёт не так, и товары не заносятся в БД. Посылается событие goods_rejected_v1 в сервис order.
  3. При получении этого события сервис order откатывает свои изменения из шага 1 — удаляет заказ.

Реализация​

Реализовывать проект будем с помощью golang как языка сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами. Создадим локальное рабочее окружение, а именно — сервисы order, goods, БД для каждого сервиса и брокер kafka:

docker-compose.yml
version: '3.9'

services:
order:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: order
environment:
- HTTP_BIND=8080
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
- HOST_DB=db-order
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- ORDER_CREATED_TOPIC=order_created_v1
- ORDER_CREATED_TOPIC=goods_created_v1
- GOODS_REJECTED_TOPIC=goods_rejected_v1
depends_on:
- db-order
- kafka
volumes:
- ./order:/app/order:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
ports:
- "8080:8080"
networks:
- saga

db-order:
image: postgres:14
environment:
- POSTGRES_DB=orders
- POSTGRES_USER=orders_user
- POSTGRES_PASSWORD=orders_password
ports:
- "5441:5432"
volumes:
- data:/var/lib/postgresql
networks:
- saga

goods:
build:
dockerfile: .docker/app.Dockerfile
context: ./
args:
SERVICE_NAME: goods
environment:
- HTTP_BIND=8081
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
- HOST_DB=db-goods
- PORT_DB=5432
- KAFKA_ADDR=kafka:9092
- ORDER_CREATED_TOPIC=order_created_v1
- GOODS_CREATED_TOPIC=goods_created_v1
- GOODS_REJECTED_TOPIC=goods_rejected_v1
volumes:
- ./goods:/app/goods:delegated
- ./.docker/entrypoint.sh:/entrypoint.sh:ro
entrypoint: /entrypoint.sh
depends_on:
- db-goods
- kafka
ports:
- "8081:8081"
networks:
- saga

db-goods:
image: postgres:14
environment:
- POSTGRES_DB=goods
- POSTGRES_USER=goods_user
- POSTGRES_PASSWORD=goods_password
ports:
- "5442:5432"
volumes:
- data:/var/lib/postgresql
networks:
- saga

zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- '2181:2181'
networks:
- saga

kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1,goods_rejected_v1:1:1
networks:
- saga

volumes:
data:

networks:
saga:

Сервис order включает в себя таблицы со статусами заказов и самими заказами:

CREATE TABLE statuses (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL
);

INSERT INTO statuses (id, name) VALUES (1, 'PENDING'), (2, 'CREATED');

CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
status_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,

FOREIGN KEY (status_id) REFERENCES statuses (id)
);
И реализует эндпоинт создания заказа:

...
server.router.HandleFunc("/v1/orders", s.CreateOrderV1).Methods(http.MethodPost)
...
В котором сохраняется сущность заказа в БД и продьюсится сообщение в kafka:

обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("Data hasn't been read.")
w.WriteHeader(http.StatusBadRequest)
return
}

orderData := model.OrderData{}
err = json.Unmarshal(body, &orderData)
if err != nil {
log.Error().Err(err).Msg("Data hasn't been parsed.")
w.WriteHeader(http.StatusBadRequest)
return
}

var orderID int64
err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, status_id, created_at) VALUES ($1, 1, NOW()) RETURNING id`, orderData.UserID).Scan(&orderID)
if err != nil {
log.Error().Err(err).Msg("Order hasn't been created.")
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := model.CreatedOrderMsg{Data: model.Order{
ID: orderID,
GoodsIds: orderData.GoodsIds,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been marshaled.")
w.WriteHeader(http.StatusInternalServerError)
return
}

producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = s.kafkaProducer.SendMessage(producerMsg)
if err != nil {
log.Error().Err(err).Msg("Message hasn't been sent.")
w.WriteHeader(http.StatusInternalServerError)
return
}
}
Cервис слушает топики goods_created_v1 и goods_rejected_v1 и в зависимости от полученного изменяет созданную сущность заказа:

обработка события goods_created_v1
func (gch GoodsCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
gce := GoodsCreatedEvent{}
err := json.Unmarshal(msg.Value, &gce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}

_, err = gch.db.Exec(context.Background(), `UPDATE orders SET status_id = 2 WHERE id = $1`, gce.Data.OrderID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been inserted.")
}

session.MarkMessage(msg, "")
}

return nil
}
обрабокта события goods_rejected_v1
Сервис goods содержит таблицу с заказами:

CREATE TABLE goods (
id BIGSERIAL PRIMARY KEY,
goods_id BIGINT NOT NULL,
order_id BIGINT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
И слушает событие order_created_v1, которое обрабатывает следующим образом:

обработrа события order_created_v1
func (och OrderCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
oce := OrderCreatedEvent{}
err := json.Unmarshal(msg.Value, &oce)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been handled.")
session.MarkMessage(msg, "")
continue
}

ctx := context.Background()
tx, err := och.db.Begin(ctx)
for _, goodsID := range oce.Data.GoodsIds {
_, err = och.db.Exec(context.Background(), `INSERT INTO goods (goods_id, order_id, created_at) VALUES ($1, $2, NOW())`, goodsID, oce.Data.ID)
if err != nil {
tx.Rollback(ctx)
log.Error().Err(err).Msg("Inserting error.")

err := och.sendRejected(oce.Data.ID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
continue
}
}

err = tx.Commit(ctx)
if err != nil {
err := tx.Rollback(ctx)
log.Error().Err(err).Msg("Transaction commit error.")
rErr := och.sendRejected(oce.Data.ID)
if rErr != nil {
log.Error().Err(rErr).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
continue
}

err = och.sendCreated(oce.Data.ID)
if err != nil {
log.Error().Err(err).Msg("Event hasn't been sent.")
}
session.MarkMessage(msg, "")
}

return nil
}

func (och OrderCreatedHandler) sendRejected(orderID int64) error {
msg := model.RejectedGoodsMsg{Data: model.Goods{
OrderID: orderID,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
return err
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_REJECTED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = och.producer.SendMessage(producerMsg)
return err
}

func (och OrderCreatedHandler) sendCreated(orderID int64) error {
msg := model.CreatedGoodsMsg{Data: model.Goods{
OrderID: orderID,
}}
msgStr, err := json.Marshal(msg)
if err != nil {
return err
}
producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
_, _, err = och.producer.SendMessage(producerMsg)
return err
}
Теперь мы может сделать запрос к сервису и создать заказ:

curl --request POST \
--header "Content-Type: application/json" \
--data '{"user_id":1,"goods_ids":[1,2]}' \
'http://localhost:8080/v1/orders'
В успешном варианте развития событий мы получаем полностью сформированную согласованную сущность заказа с соответствующими товарами. В противном случае неудача в сервисе goods влечёт за собой откат транзакции в order. Таким образом мы добились, чего хотели — согласованности данных в распределённой системе. На моем github представлен полный листинг описанной архитектуры. Для реализации повествования на основе оркестрации есть готовая библиотека здесь, заодно можно подробнее ознакомиться с принципами её работы.

Заключение​

  • Паттерн saga хорошо решает проблему согласованности в распределённых системах сервисов.
  • Одна из главных проблем повествований — что они являются ACD. У них нет изолированности, это вызывает аномалии (по аналогии с аномалиями в СУБД). Одни повествования могут влиять на данные, с которыми работают другие повествования. Для компенсации этого недостатка паттерн должен реализовывать контрмеры.
  • Если сравнивать повествование на основе оркестрации и хореографии, то второй метод проще в реализации, сложнее для понимания и подходит лучше для простых кейсов. Среди явных недостатков можно выделить возможность возникновения жёсткого связывания, т.к каждый сервис подписывается на все события, которые на него влияют.
  • Повествование на основе оркестрации не создает циклических зависимостей, и бизнес-логика значительно проще, чем в хореографии. Меньше связывания — у каждого сервиса своё API, которое вызывается оркестратором. Лучше подходит для сложных повествований.

 
Сверху