Введение
Я работаю на проекте, цель которого это некая пост-обработка финансовых операций. Это большая система, со множеством модулей и различными интеграциями с другими системами из которых к нам и попадают эти операции. У нас есть АРМ, он же веб-версия, он же фронтенд, для пользователей, которые как раз и занимаются этой пост-обработкой. Где-то около года назад появилась потребность оповещать пользователей о появлении новых операций в системе. SSE(Server-Sent Events), в данном случае, казался самым логичным и простым вариантом. Мне выпала честь реализовывать эту фичу на стороне сервера. Как оказалось, в русскоязычном сегменте достаточно мало информации об SSE в стеке java, поэтому я решил оформить свой опыт в эту статью, в ней я постараюсь осветить процесс и особенности реализации технологии в стеке java. Итогом данной статьи будет простой MVP, который умеет работать с SSE.Я предполагаю, что вы уже знаете, что такое SSE, зачем он нужен и его отличия от аналогичных технологий(WebSocket, Long Pulling). А также знакомы со стандартом Reactive Streams, о том, что из себя представляет реактивное приложение и в чем его отличия от сервелтного.
Особенности
Технология имеет ряд особенностей и ограничений, которые я, по каким-то причинам упустил на старте, но которые сильно повлияли на итоговые трудозатраты. Натыкаться на каждую из нижеперечисленных особенностей было достаточно больно, в моем случае иногда приходилось много переписывать и дописывать. Предполагаю, что в некоторых случаях это может и вовсе стать камнем преткновения, непреодолимым препятствием на пути к цели. Будет обидно узнать это где-то в середине или в конце пути, поэтому я решил вынести этот раздел на самый верх.Не поддерживает custom headers.
По каким-то причинам SSE не поддерживает собственные заголовки, есть только куки. В моем случае, через заголовки передавался csrf-токен, который дублировался в куки и затем проверялся внутри приложения. Хоть для get-запросов(коим является sse-стрим) это и не нужно, но, так уж вышло, что эта проверка работала везде. Пришлось доделывать авторизацию.
Браузеры сознательно ограничивают количество, одновременно открытых, стримов.
У всех браузеров на данный момент имеются лимиты на, одновременно открытые, стримы. Почитать можно здесь. Как с этим бороться, можно почитать здесь.
Корректно работает только в реактивном стеке.
Сервлетное приложение не умеет обрабатывать событие закрытия соединения со стороны клиента. Этого не было написано в туториалах, по которым я разрабатывал, поэтому для меня это стало очень неприятной неожиданностью. Основная часть нашего приложения - сервлетный монолит. SSE-стрим должен был стать его частью и в первой версии так и было. Но, на этапе тестирования выяснилось, что проблема мертвых стримов сильно актуальна для нас, потому что перед этим монолитом стоит прокси, который сознательно ограничивает количество одновременных соединений для пользователя, которые очень быстро заканчивались. Это можно решить, например реализовав механизм HEARTBEAT или сделать связку один пользователь - один стрим. Но, подобные решения, накладывают определенные ограничения и добавляют лишней сложности, поэтому подробно останавливаться на реализации в сервлетном стеке я не буду. Если нужно, примеры можно посмотреть здесь или здесь. В реактивном приложении эта проблема была решена, например в spring-webflux это работает, начиная с версии 2.x.x
Инфраструктура важна.
Будьте готовы к тому, что все ключевые узлы пути вашего sse-запроса потребуют настройки. Ниже я приведу пример своей инфраструктуры и трудности, с которыми столкнулся.
Разработка
Итак, мы пробежались по некоторым ограничениям технологии, теперь можно приступить к реализации. Создадим синтетический пример, в котором у нас будет реализована подписка, с привязкой к nickname и методы рассылки произвольной строки для всех текущих подписчиков или для конкретного подписчика, по его nickname. Для простоты я постараюсь использовать минимум классов.Я буду использовать следующие инструменты и библиотеки: gradle, spring-webflux, Lombok. Готовый код доступен на GitHub.
Создадим новый Gradle проект. Обновим наш build.gradle:
plugins {
id 'org.springframework.boot' version '2.4.7'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
sourceCompatibility = '11'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation "org.springframework.boot:spring-boot-starter-validation"
compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor 'org.projectlombok:lombok:1.18.8'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
test {
useJUnitPlatform()
}
Итак, что мы сделали:
1. Добавили spring-boot
2. Добавили spring-webflux
3. Добавили lombok.
Добавим main() метод, здесь все стандартно:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Далее создадим класс SubscriptionData, он будет содержать в себе информацию о подписке:
@Getter
@Setter
@AllArgsConstructor
public class SubscriptionData {
private String nickName;
private FluxSink<ServerSentEvent> fluxSink;
}
И реализуем наш контроллер с методом подписки:
@RestController
@Slf4j
public class SseRestController {
Map<UUID, SubscriptionData> subscriptions = new ConcurrentHashMap<>(); // 1
@GetMapping(path = "/open-sse-stream/{nickName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 2
public Flux<ServerSentEvent> openSseStream(@PathVariable String nickName) {
return Flux.create(fluxSink -> { // 3
log.info("create subscription for " + nickName);
UUID uuid = UUID.randomUUID();
fluxSink.onCancel( // 4
() -> {
subscriptions.remove(uuid);
log.info("subscription " + nickName + " was closed");
}
);
SubscriptionData subscriptionData = new SubscriptionData(nickName, fluxSink);
subscriptions.put(uuid, subscriptionData);
// 5
ServerSentEvent<String> helloEvent = ServerSentEvent.builder("Hello " + nickName).build();
fluxSink.next(helloEvent);
});
}
Здесь остановимся поподробнее:
- В subscriptions будем хранить информацию о подписчиках. Для каждой новой подписки будем генерировать какой-то uid, и объект класса SubscriptionData, который мы описали ранее.
- Метод будет ожидать какой-то nickName. Будем использовать его в дальнейшем для рассылки сообщений конкретным подписчикам. Обратите внимание на Content-Type. По нему браузер будет понимать, что это SSE-стрим.
- Flux.create() - Метод создает реактивный поток данных. На вход принимает реализацию Consumer<FluxSink>. В данном случае мы использовали lambda-выражение для этого. fluxSink - объект класса FluxSink, для взаимодействия с вышестоящим подписчиком. С помощью этого объекта мы как раз и отправляем данные в поток, вызовом метода next().
- Переопределяем реакцию на закрытие соединения. Удаляем подписку из списка subscriptions
- Spring уже имеет встроенное представление для sse-события. Класс org.springframework.http.codec.ServerSentEvent. Все, что от нас требуется, это заполнить нужные поля объекта этого класса. В данном случае заполняем только поле data, нам его достаточно и отправляем первое событие в стрим, вызовом fluxSink.next(helloEvent).
@Getter
@Setter
@NoArgsConstructor
public class SendMessageRequest {
private String message;
}
И теперь создадим непосредственно сам метод, добавим его в наш, уже созданный, контроллер:
@PutMapping(path = "/send-message-for-all")
public void sendMessageForAll(@RequestBody SendMessageRequest request) {
// 1
ServerSentEvent<String> event = ServerSentEvent
.builder(request.getMessage())
.build();
// 2
subscriptions.forEach((uuid, subscriptionData) ->
subscriptionData.getFluxSink().next(event)
);
}
- Создаем объект ServerSentEvent, его поле data заполняем строкой из тела нашего запроса(поле message)
- Делаем рассылку нашего сообщения во все открытые стримы
@PutMapping(path = "/send-message-by-name/{nickName}")
public void sendMessageByName(
@PathVariable String nickName,
@RequestBody SendMessageRequest request
) {
ServerSentEvent<String> event = ServerSentEvent
.builder(request.getMessage())
.build();
subscriptions.forEach((uuid, subscriptionData) -> {
if (nickName.equals(subscriptionData.getNickName())) {
subscriptionData.getFluxSink().next(event);
}
}
);
}
Этот метод принципиально ничем не отличается от предыдущего, разве что в нем есть проверка имени пользователя, его также можно объединить с предыдущим методом, при желании. В данном случае, для лучшей читаемости, решил их разделить на 2. Итак, наш сервис готов, он умеет открывать стримы и отправлять в них события, как же его протестировать?
Тестирование
После разработки хотелось бы как-то убедиться, что наше приложение работает. Я привык проверять свою работу через Postman, но, к сожалению, там до сих пор там до сих пор нет поддержки SSE . Есть другие сервисы, которые умеют работать с этой технологией. К примеру вот инструкция для Fiddler. Так это будет выглядеть:Можно также использовать curl(с флагом -N), с недавних пор он появился в Windows.
Или можно сделать свою веб-страницу для проверок. Клиентская часть для работы с sse реализуется достаточно просто. Пример страницы для текущей задачи на GitHub
Инфраструктура
Как я и писал выше, инфраструктура важна, стоит брать это во внимание и быть готовым к тому, что это может сильно повлиять на итоговые трудозатраты. Не все современные технологии умеют корректно работать с SSE. Приведу, несколько упрощенный, пример инфраструктуры проекта, над которым работаю:Приложение развернуто в кластере k8s, взаимодействие с которым осуществляется через nginx. Перед кластером стоит haproxy, с которым уже взаимодействуют браузеры пользователей.
Nginx настраивается относительно просто, в интернете полно готовых конфигов и объяснений, ни на какие подводные грабли я здесь не наступал. Например вот.
С haproxy все оказалось интереснее. Вот пример рабочей конфигурации:
defaults
mode http
option abortonclose
frontend test
bind *:80
use_backend test-backend
backend test-backend
server test_server sse:8080
С одной стороны, она выглядит достаточно просто, с другой стороны, в своем проекте, я хотел бы избавиться от секции defaults. Так как опция abortonclose, в данном случае, поддерживается только в секции backend, было бы логично перенести ее туда:
defaults
mode http
frontend test
bind *:80
use_backend test-backend
backend test-backend
option abortonclose
server test_server sse:8080
Но, если так сделать, то перестает закрываться соединение haproxy_backend -> your_backend, когда оно закрывается на фронтенде. Я поисследовал tcp-пакеты, в первом случае(рабочий конфиг) haproxy шлет rst-сигнал для разрыва соединения, во втором случае haproxy просто перестает слать этот сигнал. Таким образом остается висеть мертвое соединение до тех пор, пока в него не полетят какие-то данные и оно не закроется с ошибкой. Происходит тоже самое, что и с сервлетным стэком джава. Решение есть: перевести секцию frontend в режим tcp и можно избавиться от секции defaults. Тогда конфиг будет выглядеть так:
frontend test
bind *:80
mode tcp
use_backend test-backend
backend test-backend
mode http
option abortonclose
server test_server sse:8080
Почему так происходит? Мне не удалось найти ответа на этот вопрос. Я экспериментировал с последней версий, такое поведение сохраняется, предполагаю, что это не баг, а фича. Возможно в комментариях кто-то сможет пролить свет на это. На этом все, подведем итоги.
Заключение
Итак, мы пробежались по особенностям технологии, сделали небольшой сервис для работы с ней и разобрали как его проверять.Server-Sent Events в java. От любви до ненависти…
Введение Я работаю на проекте, цель которого это некая пост-обработка финансовых операций. Это большая система, со множеством модулей и различными интеграциями с другими системами из которых к нам и...
habr.com