Что лучше: Spark Structured Streaming или полное прекращение работы прода?

Kate

Administrator
Команда форума
Правильное построение ETL-процессов (преобразования данных) — сложная задача, а при большом объёме обрабатываемых данных неизбежно возникают проблемы с ресурсами. Поэтому нам требуется выискивать новые архитектурные решения, способные обеспечить стабильность расчётов и доступность данных, а при необходимости и масштабируемость — с минимальными усилиями.

Когда я пришел в Ozon, мне пришлось столкнуться с огромным количеством ETL-джоб. Прежде чем применить модель машинного обучения, сырые данные проходят множество этапов обработки. А само применение модели (то, ради чего существует команда) занимает всего 5% времени.

4489f7ee7704092b2614fb2c1f0106ba.jpg

Всем привет! Меня зовут Алексей, и в Ozon я занимаюсь матчингом. Что такое матчинг и зачем он нужен, мой коллега @alex_golubev13 объяснил в статье «Векторное представление товаров Prod2Vec».

Ежедневно у нас добавляются сотни тысяч новых товаров, а также меняются те, которые уже есть на сайте. Это могут быть изменения картинок, описаний, названий или цен. Процесс ETL в данном случае заключается в извлечении признаков из товаров, которые появились или обновились в течение заданного промежутка времени (на данный момент за день). Данные мы забираем из HDFS и Hive, а для работы с ними используем PySpark.

Сразу скажу, что большую часть ресурсов и времени в ETL занимает обработка изображений и текстовых данных. Так, каждое изображение проходит через несколько свёрточных нейронных сетей, которые возвращают векторное представление для картинки (эмбеддинг). Для текста — та же схема.

Сначала ETL-процесс состоял из batch-джоб, которые брали партиции данных за конкретную дату и целиком её обрабатывали. Понятно, что с ростом числа товаров они будут работать всё дольше и дольше, а объём потребляемых ресурсов будет только расти. Особенно заметно это во время действия акций и сезонных распродаж — тогда часто меняется цена и добавляется много новых товаров. В такие моменты приходилось значительно поднимать память для приложения. К тому же процесс стал занимать слишком много времени — и весь остальной пайплайн был вынужден ждать завершения ETL. Всё закончилось тем, что на количество товаров, проходящих через ETL, выставлялся лимит, и максимально туда шла треть всех обновившихся товаров. Понятно, что при таком подходе очередь товаров, которые не проходят через пайплайн, будет стремительно расти.

Для того чтобы избежать большой очереди, мы решили никогда не останавливать наш пайплайн ETL — он теперь работает постоянно. Так мы пришли к Spark Structured Streaming.

Как у нас всё работает​

Spark Structured Streaming позволяет работать с потоковыми данными, при этом можно использовать все преимущества Spark SQL. Теперь все обновления едут в Kafka-топик, а Streaming Session читает данные из него, обрабатывает и складывает в HDFS. Затем раз в день мы забираем эти данные и обновляем таблицы, которые являются результатом ETL. Таким образом, можем не выставлять лимит на количество обрабатываемых товаров в день, получая обновления равномерно в течение суток. Эмпирическим путём выяснили, что за день стриминг способен обрабатывать около 20 млн изображений и 50 млн текстовых объектов.

b6918c4487d1a282a1464f9031d7a236.png

В качестве нейросетей для получения эмбеддингов используем BERT, ResNet50, fastText, NFNet, а с недавнего времени также считаем эмбеддинги для модели Prod2Vec.

Если вы работаете с PySpark (или просто со Spark), то наверняка знаете о возможности создания пользовательских функций (user-defined functions, UDF). На данный момент PySpark позволяет использовать три вида таких функций: Python UDF, Pandas UDF, Scala UDF. Об основных отличиях и бенчмарках можно прочитать в этой статье, а я лишь скажу, что для инференса моделей используем Pandas UDF.

Давайте на примере рассмотрим, как можно выполнять инференс ML-моделей с использованием PySpark Structured Streaming и Pandas UDF, а в качестве источника сообщения используем Kafka. Весь код ниже актуален для PySpark 3.X.

Для начала — небольшой ликбез по основной терминологии Kafka.
Kafka — это инструмент, который позволяет работать с потоками событий. Например, есть приложение, которое пишет много логов. Хочется иметь к ним быстрый доступ, чтобы анализировать и делать какие-то выводы или просто сохранять в базу данных. Kafka в этом случае — посредник между приложением, которое пишет логи, и приложением или человеком, читающим эти логи.
Чтобы ориентироваться в Kafka-терминологии из данной статьи, необходимо знать про три вещи. На примере приложения с логами определим Producer, Consumer и Topic:
  1. Topic показывает, где будут храниться логи в Kafka. Можно представить, что Topic — это папка, а каждый лог в нём —файл из этой папки. У каждого объекта (лога, сообщения) есть свой индекс (offset). Kafka также партиционирует топик, разбивая его на несколько частей и раскидывая по Kafka-кластеру.
  2. Producer в данном случае будет записывать логи в Topic. Он создаётся в логируемом приложении и пишет всё, что ему скажет пользователь.
    1. log = get_log()
    2. producer.produce(log, topic)
  3. Consumer будет читать сообщения, которые находятся в топике. Он создаётся в приложении для чтения и обработки логов. При этом один топик могут читать сразу несколько консьюмеров.

Начинаем​

Представим, что есть Kafka-топик, куда поступает событие изменения описания товара на сайте или добавления нового. Необходимо обрабатывать все такие события и извлекать необходимую информацию из текстов. Скажем, что известна схема сообщений (protobuf-схема), которые находятся в топике (ID и описание товара), а к текстам мы хотим применять какую-то ML-модель, которая возвращает эмбеддинг из текста.

syntax = "proto3";

message ItemText{
int64 item_id = 1;
string item_text = 2;
}
Чтобы десериализовывать proto-сообщения в Python, необходимо создать .py-файл из .proto-файла. Я это делаю командой protoc --python_out=. <filename>.proto.

Если вы не знакомы с Protobuf, то можно перейти по ссылке — и буквально за 30 секунд понять, что это :)

Объявим функцию, которая будет создавать и возвращать текстовую модель:

class TextModel:
...
def predict(self, x):
# your code
return model_prediction
...

def get_text_model(**kwargs) -> TextModel:
# your code
return text_model
Определим функции process_text, которая будет добавлять колонку “embedding” к входным данным, и get_dataframe_from_messages, которая десериализует сообщения.

# types_pb2 как раз создается из .proto
from types_pb2 import ItemText

def get_dataframe_from_messages(messages: pd.Series) -> pd.DataFrame:
proto_buffer = ItemText()
schema = [
"item_id",
"item_text",
]
columns = {col: [] for col in schema}
for msg in data:
data = proto_buffer.FromString(msg)
for col in columns:
columns[col].append(getattr(data, col))
return pd.DataFrame(columns)

@F.pandas_udf("item_id int, item_text string, embedding array<float>")
def process_text(data: pd.Series) -> pd.DataFrame:
model = get_text_model(**kwargs)
data = get_dataframe_from_messages(data)
data["embedding"] = data["item_text"].apply(lambda x: model.predict(x))
return data

Функция обработки сообщения моделью готова. Теперь нужно научиться работать с топиком и поднять Spark Session для стриминга.

Создадим сессию и подпишемся на конкретный топик:

spark = SparkSession.builder.getOrCreate()
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("subscribe", topic)
)

Для контроля сессии также рекомендуется выставлять дополнительные параметры. Список таких параметров доступен в документации. Наиболее важными, на мой взгляд, являются:

  • maxOffsetsPerTrigger — отвечает за количество сообщений, которые попадают в батч;
  • minPartitions — указывает число партиций, на которое разбивается этот самый батч. Неправильный выбор этого параметра может значительно замедлять стриминг в micro-batch режиме.
Рассмотрим пример, когда у топика есть три партиции. Тогда по умолчанию в параметрах контекста будет minPartitions = 3. Это значит, что на каждый экзекутор прилетит по одной партиции, и текущий батч будет состоять из трёх задач.

Так, а что делать, если хочется повысить производительность стриминг-джобы? Какой параметр нужно изменить?

c41df970bd19d84d8610acd2ba0919bd.png

Окей, вы добавляете больше экзекуторов в Spark-приложение в надежде, что оно ускорится. К сожалению, оно не ускоряется ☹️ Что же происходит в действительности?

4da137ed64a8d8e7a13166ee8a02ebd1.png

Добавляется один экзекутор, для которого просто нет данных, так как в параметрах контекста значение minPartitions выставлено по умолчанию. В итоге такой экзекутор простаивает. Можно ли тогда изменить число партиций в Kafka, чтобы оно совпадало с количеством экзекуторов? Да, так тоже можно сделать, но только если у вас есть доступ к настройкам топика 🙂

Поэтому лучше сделать по-другому: выставить значение minPartitions = 6 и количество экзекуторов тоже 6 — тогда каждая партиция в Kafka будет разбиваться Spark’ом на две подпартиции и спокойно скейлиться на шесть экзекуторов, которые смогут параллельно выполнять задачи. Здесь важно отметить, что сами сообщения из топика не шафлятся. Драйвер просто отдаёт офсеты каждому экзекутору, где создаётся консьюмер для их чтения.

020de3755b1c9a76ea9154fdc6cb6a4e.png

Параметр maxOffsetPerTrigger позволяет контролировать, сколько сообщений (офсетов) из топика попадёт в текущий батч. Здесь тоже важно соблюдать баланс, так как слишком большой батч может долго обрабатываться (и в случае падения придётся его пересчитывать), а слишком маленький батч создаст много маленьких файлов, что не очень хорошо для HDFS и нагружает неймноду.

Что за неймнода?
Неймнода на Hadoop-кластере хранит информацию о дереве файлов и директорий, а также знает, где лежит тот или иной файл. В случае если неймнода перестаёт работать, падает весь кластер :)
Зададим и применим указанные параметры:

kafka_params = {
"maxOffsetPerTrigger": 100_000,
"minPartitions": 6
}
for k, v in kafka_params.items():
df = df.option(k, v)
df = df.load()

После выполнения load() можно работать с df как с обычным DataFrame, применяя к нему привычные SQL-трансформации.

Посмотрим на схему df:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
В данный момент нас интересуют сами сообщения. Они лежат в поле value. К этому полю мы применяем Pandas UDF. UDF вернёт структуру, которую можно распаковать через звёздочку, — и получить готовый для записи DataFrame.

query = (
df.withColumn("result", process_text(F.col("value").cast("binary")))
.select("result.*")
.writeStream
.foreachBatch(do_smth)
)
Чтобы записывать данные в HDFS или просто увидеть результат в логах, можно создать функцию, которая будет вызываться для каждого кусочка данных, и передать её в foreachBatch.

def do_smth(df, *args):
df.show(10)
В этой функции просто смотрим на десять строк данных, которые приезжают из Pandas UDF. Теперь метод start() начинает чтение из потокового источника информации (в данном случае — из Kafka-топика).

query = query.start()
Существует множество способов завершения Spark Structured Streaming — от перезагрузки кернела (в случае Jupyter Notebook) до graceful shutdown c реализацией на Scala (тык). В этой статье я не буду подробно разбирать каждый, а просто покажу тот, который используем мы.

is_stopped = False
while not is_stopped:
is_stopped = query.awaitTermination(timeout=timeout)
if not is_stopped and exists(indicator_path):
logger.info("Received stop indicator, stopping query...")
query.stop()
Что здесь происходит? Каждые timeout секунд идём в HDFS и смотрим, есть ли там файл-индикатор indicator_path. Как только он появляется, стриминг завершается.

Таким образом, получаем работающее Spark Structured Streaming приложение, где в качестве источника данных выступает Kafka-топик с обновлениями, а для обработки используется текстовая ML-модель.


Daemon Module​

Данный кусок относится не только к Spark Structured Streaming, но и к работе с Pandas UDF в целом.

Внимательный читатель может заметить следующее: Pandas UDF работает батчами, размер которых контролируется через spark.sql.execution.arrow.maxRecordsPerBatch. Неужели на каждом батче приходится инициализировать модель заново? Хорошо, если загрузка модели занимает одну-две секунды. Но при работе с тяжёлыми моделями (например, из Hugging Face) или загрузке больших структур данных инициализация может занимать гораздо больше времени.

Да, в этом месте возникает большой оверхед, но его можно избежать :)

Было бы хорошо единожды создать объект на экзекуторе, а потом передавать ссылку на него в каждый дочерний процесс (на воркеры). Такая стратегия называется copy-on-write. Как понятно из названия, копирование объекта происходит, только когда дочерний процесс пытается изменить объект по ссылке. Также у Spark Session есть параметр spark.python.daemon.module. Он указывает на модуль, который стартует при появлении нового экзекутора. Данный модуль запустится один раз на экзекуторах и форкнет процесс для запуска воркеров.

Для загрузки ML-модели как раз используем свой daemon-модуль. Чтобы организовать copy-on-write, необходимо ограничить запуск дочернего процесса форком, а также заимпортить модуль, в котором лежит загрузка модели.

Например, так:

multiprocessing.set_start_method("fork")
module = importlib.import_module(module_path)
Важно, чтобы при импорте модуля сама загрузка модели триггернулась.

Затем кладём в кэш импортированный модуль через sys.modules:

sys.modules[module_name] = module
Теперь можно запустить стандартный daemon manager, который будет форкать родительский процесс. При этом пространство sys.modules у дочерних процессов будет идентично родительскому.

После этого можно импортировать нужную модель прямо из модуля, избегая процесса её инициализации (благодаря системе импортов в Python, когда первым делом проверяется sys.modules).

@F.pandas_udf("item_id int, item_text string, embedding array<float>")
def process_text(data: pd.Series) -> pd.DataFrame:
from daemon_module import model
data = get_dataframe_from_messages(data)
data["embedding"] = data["item_text"].apply(lambda x: model.predict(x))
return data
Таким образом можно реализовать потоковую обработку данных в PySpark с использованием Pandas UDF. Такой подход позволяет не только ускорить инициализацию модели, но и уменьшить потребление памяти (!), поскольку все дочерние процессы используют объект из родительского процесса без копирования.


Что в итоге​

Так, обработка данных происходит постоянно, а затраты памяти не зависят от количества новых и обновившихся товаров, поскольку есть ограничение на число объектов сверху в одном батче (maxOffsetsPerTrigger). При возникновении большого лага (в топик идёт слишком много товаров, стриминг не успевает их обработать) можно увеличить количество экзекуторов, тем самым ускорив обработку батча.

Но как определить, что лаг увеличивается или уменьшается? :/ Можно воспользоваться стандартными средствами Spark UI. Идём в Application Master и видим такую картину:

41fd1f55d2760370ebf060de4f1960b1.png

Окей, вроде понятно, че-то считаем :)

Но как быть, если хотим сделать мониторинг стриминга в Grafana или, например, алерты, которые скажут о критическом значении лага для консьюмера в топике? Об этом поговорим во второй части 👋

Полезные материалы​

  1. Дока по Spark Structured Streaming
  2. Интеграция с Kafka
  3. Про память в Spark приложениях (немного оффтоп, но если часто работаете со Спарком — тут много полезной инфы)

 
Сверху