Как обучить миллионы моделей прогнозирования временными сериями

Kate

Administrator
Команда форума

Постановка задачи​

Не буду вдаваться в подробности о том, откуда берутся миллионы временных серий и почему они умудряются изменяться еженедельно. Просто возникла задача еженедельно сделать прогноз на 2-8 недель по паре миллионов временных серий. Причем не просто прогноз, а с кроссвалидацией и выбором наиболее оптимальной модели (ARIMA, нейронная сеть, и т.п.).

Имеется свыше терабайта исходных данных и достаточно сложные алгоритмы трансформации и чистки данных. Чтобы не гонять большие массивы данных по сети решено было реализовать прототип на одном сервере.

Первый блин комом​

На 32-х ядерный виртуальный сервер с 256 ГБ оперативной памяти был установлен MS SQL 2016 Standart и R service. В принципе, вместо R вполне можно было использовать Python или даже CLR. Но аналога auto.arima в Python тогда найдено не было (может появился уже?). А так как сервер должен был использоваться не только для этой задачи, рисковать стабильностью его работы вынося код в CLR не хотелось.

Трансформация и очистка данных заняла на нем порядка 6 часов. После чего, средствами sp_execute_external_script вызывался код на R, выполняющий уже восстановление пропусков в временных сериях фильтром Калмана, обучающий несколько моделей и сравнивающий результаты их прогнозов кроссвалидацией. Возвращался результат прогноза выигравшей модели.

При этом возник целый ряд проблем с производительностью.

  1. Если вызывать sp_execute_external_script, передавая ему по одной временной серии, то только на вызов любой функции сервиса внешних языков уходило около 30 мс. Вроде бы немного, но для пары миллионов вызовов это получалось уже свыше 16 часов. При этом редакция MS SQL Standard поддерживает только 2 ядра при интеграции с R.
  2. Можно вызывать sp_execute_external_script, передавая ему множество временных серий. Но для этого надо объединять их через PIVOT в матрицу, что MS SQL делал очень и очень медленно.
  3. Попытка реализовать параллельное обучение моделей в R так и не увенчалась успехом. Какими бы средствами я не пользовался, периодически один из рабов просто отваливался. А на миллионах вызовов - отваливался всегда. Даже просто включение более одного ядра в функции auto.arima опять таки, раз в ~100 тыс. вызовов, приводило к молчаливому возврату не обученной модели.
Как результат, на двух ядрах процесс прогнозирования пары миллионов временных серий не успевал уложиться в неделю, при том что прогнозировать надо еженедельно.

Сложности выбора​

Первой возникшей мыслью был переход на MS SQL Enterprise. Было произведено даже исследование, показавшее, что на MS SQL Developer результат можно получить, примерно, за трое суток. Вот только столь существенное увеличение бюджета (Enterprise стоит намного дороже Standard) согласовать не удалось.

Решение проблемы​

Так как уже были готовы алгоритмы трансформации и очистки данных на T-SQL и код обучения моделей и кроссвалидации на R, то логичным вариантом решения оказался PostgreSQL, напрямую, в своей среде, поддерживающий выполнение функций на R. Поэтому на ту же виртуальную машину был установлен Oracle Linux, PostgreSQL и R.

И действительно, временные затраты на вызов R функций у PostgreSQL оказались ниже 1 мс. При этом проблемы параллельных вычислений средствами R сохранились. Поэтому решено было передавать функции на R по одной временной серии при каждом вызове.

Просто так выполнять запрос с функцией на R на множестве ядер PostgreSQL не захотел. Поэтому, средствами dblink() запрос вручную параллелится на заданное количество ядер.

Подробности реализации
Была написана довольно простая процедура
CREATE OR REPLACE PROCEDURE perform_query_async_sp (
slave_query text,
cursor_query text,
slaves smallint=16,
batch_size int=64 ) AS $proc$
<<proc>>
DECLARE
dbname text=current_database();
batches_cur refcursor;
slave_list text[];
slave_connection text;
conn_status text;
is_not_dispatched boolean;
in_list text;
BEGIN
FOR i IN 1..slaves LOOP
slave_connection='Slave'||i::text;
conn_status=dblink_connect(slave_connection,'dbname='||dbname);
IF conn_status<>'OK' THEN
INSERT INTO PBD_ExecutionLog (LogLevel, SourceName, LogMessage, LogCharData)
VALUES (32, 'perform_query_async_sp', format('Slave %s connection error',slave_connection), slave_connection);
COMMIT AND CHAIN;
RAISE EXCEPTION 'Slave % connection error', slave_connection;
END IF;
slave_list=array_append(slave_list,slave_connection);
END LOOP;

OPEN batches_cur FOR EXECUTE cursor_query USING batch_size;
<<cursor_loop>>
LOOP
FETCH batches_cur INTO in_list;
EXIT cursor_loop WHEN NOT FOUND;

is_not_dispatched=TRUE;
WHILE is_not_dispatched LOOP
<<slaves_loop>>
FOREACH slave_connection IN ARRAY slave_list LOOP
IF dblink_is_busy(slave_connection)=0 THEN
conn_status='';
WHILE conn_status IS NOT NULL LOOP
SELECT res FROM dblink_get_result(slave_connection) AS R (res text) INTO conn_status;
END LOOP;
IF dblink_send_query(slave_connection,format(slave_query,in_list))=0 THEN
INSERT INTO PBD_ExecutionLog (LogLevel, SourceName, LogMessage, LogCharData)
VALUES (32, `{OBJECT_NAME`}, format('Slave %s send query error',slave_connection), slave_connection);
COMMIT AND CHAIN;
RAISE EXCEPTION 'Slave % send query error', slave_connection;
END IF;
is_not_dispatched=FALSE;
EXIT slaves_loop;
END IF;
END LOOP;
IF is_not_dispatched THEN
PERFORM pg_sleep(0.01);
END IF;
END LOOP;
END LOOP;
CLOSE batches_cur;

FOREACH slave_connection IN ARRAY slave_list LOOP
WHILE dblink_is_busy(slave_connection)=1 LOOP
PERFORM pg_sleep(0.01);
END LOOP;
conn_status='';
WHILE conn_status IS NOT NULL LOOP
SELECT res FROM dblink_get_result(slave_connection) AS R (res text) INTO conn_status;
END LOOP;
SELECT res FROM dblink_get_result(slave_connection) AS R (res text) INTO conn_status;
conn_status=dblink_disconnect(slave_connection);
END LOOP;
END; $proc$ LANGUAGE plpgsql;
В параметре slave_query процедуре передается произвольный SQL запрос, обязанный содержать один и только один placeholder %s. Обычно, в виде конструкции IN (%s). В параметре cursor_query передается текст запроса для курсора, который обязан возвращать одну строку (varchar), которая будет подставляться вместо %s при передаче запроса из slave_query очередному рабу. Текст курсора обязан содержать placeholder $1, в который подставляется значение параметра batch_size. Таким образом проще управлять количеством элементов списка, возвращаемых курсором в строке.
Пример строки cursor_query:
SELECT array_to_string(array_agg(Q.VectorId),',') AS in_list
FROM (
SELECT V.VectorId, ROW_NUMBER() OVER (ORDER BY V.VectorId) AS rn
FROM tmp_global_statistics_data_time_series_arrays V
ORDER BY V.VectorId) Q
GROUP BY Q.rn/$1 ORDER BY Q.rn/$1 DESC;

Следует отметить, что права на вызов процедуры perform_query_async_sp остаются только у административной роли. Соответственно, вызывать эту процедуру могут только процедуры созданные с правами администратора и с опцией SECURITY DEFINER. Все же через dblink() без аутентификации исполнение запросов происходит под административными правами и это надо учитывать.
Кроме того, благодаря поддержке массивов в PostgreSQL удалось существенно упростить трансформацию и очистку данных, а так же в разы уменьшить объемы промежуточных таблиц.

В результате сейчас свыше двух с половиной миллионов прогнозов вычисляются, примерно, за 16 часов с ограничением на использование 24 ядер. Так как за воскресенье, когда нагрузка от пользователей минимальна и 8 оставшихся ядер им достаточно, один сервер вполне управляется с этой задачей, то прототип как то сам собой превратился в продуктивное решение.

 
Сверху