В этой статье я хочу остановиться на том как получает и обрабатываются сообщения spring-kafka.
Стоит оговориться, что сейчас мы рассматриваем ситуацию с enable.auto.commit = true. Согласно документации начиная с версии 2.3 настройка auto.commit по-умолчанию выставлена в false, хотя раньше это значение было аналогично значению по-умолчанию в kafka-clients, т.е. true. Это связанно с тем что контейнер KafkaMessageListenerContainer имеет собственные механизмы управления коммитом. Насколько это удобнее и какие тут есть плюсы и минусы - пожалуй тема отдельной статьи.
@Configuration
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
myListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
...
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “true”);
...
return props;
}
}
И код который будет совершать какую-то работу с полученными сообщениями:
@KafkaListener(topics = "myTopic", containerFactory=”myListenerContainerFactory”)
public void listen(String data) {
...
}
Сперва остановимся подробнее на конфигурации. Тут в игру вступают две сущности:
protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
this.valueDeserializerSupplier.get());
}
ConcurrentKafkaListenerContainerFactory также прост по своей сути. Он создаёт объект ConcurrentKafkaListenerContainer, который в свою очередь создает KafkaMessageListenerContainer в количестве указанном в поле concurrent. Если наш топик имеет партиций меньше чем указанно в поле concurrent, то значение поля изменяется на количество партиций. Это сделано ввиду бесполезности создания большего числа слушателей, чем у топика есть партиций, т.к. kafka на своей стороне позволяет подключиться к одной партиции только одному слушателю в пределах одной группы слушателей, все остальные слушатели из этой группы будут распределены по другим партициям либо останутся незадействоваными. Все это можно наглядно увидеть в методе doStart()
Итак, мы добрались наконец добрались до KafkaMessageListenerContainer.
В методе doStart() этого класса мы получаем ссылку на метод в классе на который мы повесили аннотацию @KafkaListener и указали в параметре containerFactory название нашего ConcurrentKafkaListenerContainerFactory из конфигурации.
Object messageListener = containerProperties.getMessageListener();
Далее в коде мы видим получение объекта AsyncListenableTaskExecutor и если этого объекта не существует будет создан объект с типом SimpleAsyncTaskExecutor, который создаст отдельный поток для нашего слушателя.
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
В конце концов мы создаем обертку над полученным слушателем из класса ListenerConsumer() объявленного тут же в KafkaMessageListenerContainer, а вот он уже в свою очередь создаст в конструкторе экземпляр KafkaConsumer с помощью фабрики DefaultKafkaConsumerFactory объявленной нами в конфигурации
this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
Мы установили связь между кодом, который мы пометили аннотацией @KafkaListener и непосредственно классом KafkaConsumer и разобрались для чего используются классы в конфигурации.
Теперь посмотрим как же происходит получение и обработка сообщения.
Итак у ListenerConsumer есть метод pollAndInvoke() в котором происходит вызов метода в котором в свою очередь непосредственно происходит вызов метода poll() у KafkaConsumer для получения новых сообщений (и коммита offset в случае enable.auto.commit = true) Полученный сообщения передаются в метод invokeListener() для непосредственной обработки.
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
} else {
invokeRecordListener(records);
}
}
В методе помеченным @KafkaListener мы можем обрабатывать сообщения как по одному так и пачкой (все полученные из топика при вызове KafkaConsumer->poll()) Если мы остановились на первом варианте и обрабатываем сообщения в нашем @KafkaListener по одному, то ListenerConsumer будет просто с помощью итератора идти по всему полученному набору передавая сообщения на обработку (метод doInvokeWithRecords(...)), который в свою очередь через цепочку вызовов передает сообщение на обработку нашему методу помеченному аннотацией @KafkaListener.
А что же происходит если в нашем коде при обработке сообщения выбрасывается исключение?
При получении и обработки сообщений ListenerConsumer отлавливает все возможные типы исключений в методе doRun() При обработке неспецифичных исключений используются два разных механизма в зависимости от версии spring-kafka. В версиях младше 2.5 мы можем наблюдать следующее поведение: если мы самостоятельно не настроили обработчик ошибок, то будет создан LoggingErrorHandler, который просто напросто залогирует ошибку и обработка продолжится.
Начиная с версии 2.5 обработчиком по-умолчанию становится SeekToCurrentErrorHandler в котором произойдет 10 попыток обработать сообщение без задержки и если все они закончатся неудачей ошибка также будет залогирована и мы перейдем к обработке следующего сообщения.
Это необходимо учитывать при оценки гарантий обработки сообщений, которые поддерживает наше приложение, т.к. со своей стороны kafka выполнила все обязательства по доставке сообщения.
У нас остался еще один момент, который стоит прояснить: как часто spring-kafka будет вызывать метод poll() у KafkaConsumer ? В нашей конфигурации при создании KafkaListenerContainerFactory мы можем указать следующий параметр:
factory.getContainerProperties().setPollTimeout(3000);
что будет означать следующее: при вызове метода poll() у KafkaConsumer ему в качестве аргумента будет передаваться это значение (само значение задает время в миллисекундах, т.е. в нашем случае 3 секунды). Именно это происходит в методе doPoll()
this.consumer.poll(this.pollTimeout);
KafkaConsumer же в свою очередь при вызове метода poll будет ждать переданное ему количество времени пока не наберется столько сообщений сколько мы указали в параметре max.poll.records (значение по-умолчанию 500 записей). Если pollTimeout будет равен 0, то вызов метода poll будет происходить без задержек возвращая пустой результат.
Подведем итог.
Стоит оговориться, что сейчас мы рассматриваем ситуацию с enable.auto.commit = true. Согласно документации начиная с версии 2.3 настройка auto.commit по-умолчанию выставлена в false, хотя раньше это значение было аналогично значению по-умолчанию в kafka-clients, т.е. true. Это связанно с тем что контейнер KafkaMessageListenerContainer имеет собственные механизмы управления коммитом. Насколько это удобнее и какие тут есть плюсы и минусы - пожалуй тема отдельной статьи.
Я постараюсь ответить на следующие вопросы:Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.
- Как в spring-kafka построена работа с KafkaConsumer?
- Какие есть возможности для параллельной обработки сообщений?
- Что происходит при возникновении ошибок при обработки сообщений?
@Configuration
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
myListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
...
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “true”);
...
return props;
}
}
И код который будет совершать какую-то работу с полученными сообщениями:
@KafkaListener(topics = "myTopic", containerFactory=”myListenerContainerFactory”)
public void listen(String data) {
...
}
Сперва остановимся подробнее на конфигурации. Тут в игру вступают две сущности:
- DefaultKafkaConsumerFactory
- ConcurrentKafkaListenerContainerFactory
protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
this.valueDeserializerSupplier.get());
}
ConcurrentKafkaListenerContainerFactory также прост по своей сути. Он создаёт объект ConcurrentKafkaListenerContainer, который в свою очередь создает KafkaMessageListenerContainer в количестве указанном в поле concurrent. Если наш топик имеет партиций меньше чем указанно в поле concurrent, то значение поля изменяется на количество партиций. Это сделано ввиду бесполезности создания большего числа слушателей, чем у топика есть партиций, т.к. kafka на своей стороне позволяет подключиться к одной партиции только одному слушателю в пределах одной группы слушателей, все остальные слушатели из этой группы будут распределены по другим партициям либо останутся незадействоваными. Все это можно наглядно увидеть в методе doStart()
Итак, мы добрались наконец добрались до KafkaMessageListenerContainer.
В методе doStart() этого класса мы получаем ссылку на метод в классе на который мы повесили аннотацию @KafkaListener и указали в параметре containerFactory название нашего ConcurrentKafkaListenerContainerFactory из конфигурации.
Object messageListener = containerProperties.getMessageListener();
Далее в коде мы видим получение объекта AsyncListenableTaskExecutor и если этого объекта не существует будет создан объект с типом SimpleAsyncTaskExecutor, который создаст отдельный поток для нашего слушателя.
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
В конце концов мы создаем обертку над полученным слушателем из класса ListenerConsumer() объявленного тут же в KafkaMessageListenerContainer, а вот он уже в свою очередь создаст в конструкторе экземпляр KafkaConsumer с помощью фабрики DefaultKafkaConsumerFactory объявленной нами в конфигурации
this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
Мы установили связь между кодом, который мы пометили аннотацией @KafkaListener и непосредственно классом KafkaConsumer и разобрались для чего используются классы в конфигурации.
Теперь посмотрим как же происходит получение и обработка сообщения.
Итак у ListenerConsumer есть метод pollAndInvoke() в котором происходит вызов метода в котором в свою очередь непосредственно происходит вызов метода poll() у KafkaConsumer для получения новых сообщений (и коммита offset в случае enable.auto.commit = true) Полученный сообщения передаются в метод invokeListener() для непосредственной обработки.
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
} else {
invokeRecordListener(records);
}
}
В методе помеченным @KafkaListener мы можем обрабатывать сообщения как по одному так и пачкой (все полученные из топика при вызове KafkaConsumer->poll()) Если мы остановились на первом варианте и обрабатываем сообщения в нашем @KafkaListener по одному, то ListenerConsumer будет просто с помощью итератора идти по всему полученному набору передавая сообщения на обработку (метод doInvokeWithRecords(...)), который в свою очередь через цепочку вызовов передает сообщение на обработку нашему методу помеченному аннотацией @KafkaListener.
А что же происходит если в нашем коде при обработке сообщения выбрасывается исключение?
При получении и обработки сообщений ListenerConsumer отлавливает все возможные типы исключений в методе doRun() При обработке неспецифичных исключений используются два разных механизма в зависимости от версии spring-kafka. В версиях младше 2.5 мы можем наблюдать следующее поведение: если мы самостоятельно не настроили обработчик ошибок, то будет создан LoggingErrorHandler, который просто напросто залогирует ошибку и обработка продолжится.
Начиная с версии 2.5 обработчиком по-умолчанию становится SeekToCurrentErrorHandler в котором произойдет 10 попыток обработать сообщение без задержки и если все они закончатся неудачей ошибка также будет залогирована и мы перейдем к обработке следующего сообщения.
Это необходимо учитывать при оценки гарантий обработки сообщений, которые поддерживает наше приложение, т.к. со своей стороны kafka выполнила все обязательства по доставке сообщения.
У нас остался еще один момент, который стоит прояснить: как часто spring-kafka будет вызывать метод poll() у KafkaConsumer ? В нашей конфигурации при создании KafkaListenerContainerFactory мы можем указать следующий параметр:
factory.getContainerProperties().setPollTimeout(3000);
что будет означать следующее: при вызове метода poll() у KafkaConsumer ему в качестве аргумента будет передаваться это значение (само значение задает время в миллисекундах, т.е. в нашем случае 3 секунды). Именно это происходит в методе doPoll()
this.consumer.poll(this.pollTimeout);
KafkaConsumer же в свою очередь при вызове метода poll будет ждать переданное ему количество времени пока не наберется столько сообщений сколько мы указали в параметре max.poll.records (значение по-умолчанию 500 записей). Если pollTimeout будет равен 0, то вызов метода poll будет происходить без задержек возвращая пустой результат.
Подведем итог.
- Под капотом spring-kafka использует все тот же KafkaConsumer из библиотеки kafka-clients и работа с ним осуществляется в отдельном потоке.
- Мы можем смело использовать механизм auto-commit, но этот параметр лучше всего явно прописывать в конфигурации.
- Следует внимательно отнестись к обработчикам ошибок по-умолчанию и учитывать их поведение при расчетах надежности нашей системы.
Как spring-kafka обрабатывает сообщения и не мешает ли этому auto-commit?
В предыдущей статье мы рассмотрели как работает KafkaConsumer и как реализован механизм auto-commit. В этой статье я хочу остановиться на том как получает и обрабатываются сообщения...
habr.com