Мы команда мониторинга и анализа данных биотехнологической компании BIOCAD. Хотим рассказать вам о том, как мы собираем данные для аналитики из практически всех сервисов компании и при этом вполне успешно справляемся без полноценного дата-инженера. Пост будет интересен как тем, кто только ищет решение для ETL, так и тем, кто уже работает с NiFi или другими аналогичными инструментами и желает познакомиться с наработками, идеями и опытом других команд.
Этот аспект (вкупе с большим количеством бизнес-направлений) неизбежно приводит к постоянному нашему развитию в различных областях — со временем учишься работать с промышленными данными SCADA, собирать данные из облачного SalesForce, редактировать объекты через API Jira и ещё массе всего.
В итоге это привело к тому, что мы самостоятельно построили аналитическое хранилище данных (PostgreSQL + MongoDB) для предоставления BI отчётности, быстрой отработки различных ad-hoc запросов и решения Data Science задач. Но выделенного дата-инженера у нас нет, и команда самостоятельно выстраивает пайплайны сбора и обработки необходимых данных из 30+ различных сервисов. При таком подходе есть свои плюсы и минусы, нам нравится, как о них написали в блоге Skyeng
В рамках этого материала мы не будем говорить о базовой функциональности Apache NiFi, поэтому, если вы не знакомы с инструментом, рекомендуем изучить другие материалы по теме.
Первый процессор — служит для запуска всей цепочки. Он формирует FlowFile, который поступает на вход следующего процессора и выполняет роль «триггера». GenerateFlowFile может запускаться по требованию или по расписанию, задаваемому выражением Сron. У нас это происходит в ночное время, чтобы не мешать работе наших сервисов днем. Например, на скриншоте ниже видно, что запуск задачи назначен на 02:50. Мы учитываем специфику работы БД, из которых будут загружаться данные. Такой подход позволяет нам равномерно распределять нагрузку и поддерживать стабильную работу всех задействованных систем.
Каждый FlowFile содержит данные (content) и метаданные (attributes), то есть полезную нагрузку для последующих процессоров. В качестве полезной нагрузки FlowFile’а может выступать необходимая нам информация.
Для нашего случая передадим в содержимом FlowFile SQL-скрипт, с помощью которого следующий процессор запросит в БД нужные данные.
SELECT id, title, code, status, result
FROM lims_test_active;
Для этого используем поле CustomText во вкладке Properties в окне настроек процессора GenerateFlowFile.
Если данных становится слишком много для ежедневной перезаписи, следует организовать инкрементальный сбор (хорошо подходит для различных логов, событий, показаний датчиков и т. д). Тогда не придется каждый раз заново собирать все данные, а можно «дособирать» только свежие — например, появившиеся после определённой даты или отсортированные по последнему ключу.
Второй процессор — ExecuteSQL — исполняет передаваемый в него запрос и возвращает полученные данные в формате Avro (формат хранения файлов, применяемый во многих продуктах Apache). Стоит отметить, что вместо ExecuteSQL можно задействовать процессор ExecuteSQLRecord. В зависимости от выбранного Controller Service в качестве RecordWriter он может отдавать FlowFile с содержимым в форматах Avro, JSON, CSV и XML, однако его настройка будет несколько сложнее.
Чтобы процессор ExecuteSQL понял, куда он должен отправить запрос, нужно настроить специальный Controller Service для подключения к базе данных. В настройках (поле Compatible Controller Services) выберем Database Connection Pooling Service. Он управляет открытыми соединениями к БД, создает новые и закрывает старые.
В свойствах Controller Service указываем URL-адрес целевой БД (Connection URL), имя класса драйвера (Database Driver Class Name) и путь до драйвера в файловой системе (Database Driver Location). Понять, что нужно писать в поле Class Name, можно по различным инструкциям в сети. В нашем случае мы указали Class Name для JDBC-драйвера — com.mysql.jdbc.Driver — и прописали его расположение на сервере NiFi. Этот драйвер позволяет выполнять SQL-запросы к базе данных и зависит от её типа, поэтому его нужно предварительно загрузить с официального сайта разработчиков [вот ссылка для MySQL, а вот — для PSQL].
Далее, остается выбрать настроенный сервис в свойствах процессора и заполнить остальные обязательные (выделены жирным) атрибуты.
Далее, объединим процессоры, указав при этом тип соединения с помощью отношения success или failure. Так, процессор будет понимать, куда направлять FlowFile c результатами работы или ошибкой. Теперь осталось запустить цепочку, и если все сделано правильно, то на схеме красные квадратики сменятся на зеленые треугольники.
В итоге мы сформировали цепочку из двух процессоров, которые получают таблицу целиком из внешней базы данных MySQL. Остается записать эти данные в новую базу данных для дальнейшей обработки и использования.
CREATE TABLE lims_test_active (
id int,
title text,
code text,
status text,
result int
);
Для случаев в будущем (когда наша цепочка уже записала данные в таблицу), нужно предусмотреть удаление старых данных, перед перезаписью используем процессор PutSQL для выполнения TRUNCATE lims_test_active.
Далее, обратимся к процессору PutDatabaseRecord для записи новых данных. Требуется создать еще один Database Connection Pooling Service для подключения к БД, в которую мы будем записывать данные. Также пару слов тут нужно сказать про Record Reader. Это Controller Service для получения схемы данных из FlowFile (его контента или атрибутов).
Перейдем в настройки Record Reader, где в качестве значения для свойства Schema Access Strategy выберем встроенную в Avro схему данных (Use Embedded Avro Schema) и пропишем строку ${avro.schema} в блок Schema Text. Мы выбираем Use Embedded Avro Schema, так как до этого использовали процессор, получающий данные в формате Avro. На выбор стратегии влияет то, куда эта схема записана в пришедшем FlowFile (почитать про настройку можно в официальной документации, например, в описании AvroReader, или на тематических площадках). В свою очередь, содержание строки Schema Text (или Schema Name) зависит от того, в какой из этих атрибутов ранее была помещена схема.
Соединяем процессоры между собой с помощью отношения success, не забывая при этом «выключить» другие отношения (auto terminate). Это нужно сделать для того, чтобы процессор не выдавал ошибку типа: «Realationship X is invalid because Relationship X is not connected to any component and is not auto-terminated» (не все активные соединения знают, что им делать).
Отключить отношения можно вот так:
Схема принимает такой вид:
Если все прошло успешно, то мы увидим в нашей БД таблицу с данными, готовыми для дальнейшей обработки и анализа.
Конечно, если таблиц и сервисов много, то не стоит делать такую цепочку под каждую таблицу, это займет много времени и будет очень затратно при поддержке. Поэтому далее расскажем, как можно масштабировать этот пайплайн, для работы с десятками и даже сотнями таблиц.
Основная идея масштабирования состоит в том, чтобы держать список таблиц в одном месте, а при его обработке разделять и распараллеливать сборы. Мы храним список таблиц и запросов в XML, но подойдут и другие структуры данных вроде JSON или простого текста с разделителями. Вот так выглядит наш журнал для сбора таблиц из БД:
Структура документа следующая: теги <sources> задают начало и конец списка таблиц, а теги <source> — структуру одной таблицы. Теги <query> определяют SQL скрипт, а теги <out> хранят в себе наименование таблицы для сохранения данных, полученных в результате выполнения SQL запроса.
Двумя важными компонентами масштабируемого пайплайна являются процессоры SplitXML и EvaluateXPath.
Первый нужен, чтобы разделить FlowFile с изначальным XML на несколько файлов — по одному на каждую таблицу (теги <source>). Второй выполняет XPath-запросы (язык запросов к элементам XML-документа) по отношению к содержимому FlowFile. Их результаты записывает в соответствующие атрибуты FlowFile. Другими словами, процессор нужен для того, чтобы определенную информацию из контента переложить в атрибуты collection и query, которые теперь можно использовать в любом месте.
После того как EvaluateXPath выполнил свою задачу, в дело вступает ExecuteSQL, который реализует полученный на предыдущем шаге SQL-запрос (атрибут query) для загрузки нужных данных из БД.
На этом шаге можно настроить количество параллельно исполняемых задач процессором. Это указывается во вкладке Scheduling в настройках процессора ExecuteSQL.
Далее, используем PutSQL для очистки старых данных в нужной таблице БД, исполняя через него команду TRUNCATE, ее передаем в атрибуте SQL Statement, используя Expression Language для подстановки нужного имени таблицы.
Для вставки полученных данных в нужную таблицу используем процессор PutDatabaseRecord с настройками INSERT и указанием целевой таблицы ${collection}.
В итоге мы получили масштабируемую EL-цепочку, которую можно использовать для сбора данных из множества таблиц.
Каждый день мы обновляем список серий, ошибок, измерений, калибровок, событий и других параметров с различных сервисов, систем на производстве и в лабораториях. Для получения этих данных можно использовать различные промышленные протоколы например OPC UA, но благодаря усилиям коллег, занимающихся разработкой АСУ ТП, мы можем забирать их через REST API. Если интересно, то можно почитать их пост о системе управления биореакторами на базе openSCADA и отечественных контроллеров.
Далее — общий вид ETL-цепочки для этого кейса:
В этом случае GenerateFlowFile задает список параметров в XML — так же, как и в предыдущем кейсе.
Далее, в работу вступают уже знакомые процессоры SplitXML и EvaluateXPath. Мы разбиваем XML на разные FlowFile и записываем их содержимое в атрибуты device_name и parameter.
Следующий шаг — получить последнюю дату успешного сбора данных по конкретному датчику. Выполним этот запрос с помощью ExecuteSQLRecord и получим последнее значение end_time из нашей таблицы с логами.
Далее, передаем эту информацию в EvaluateJsonPath, который запишет её в атрибут start.
Значение даты в start пригодится нам для реализации нового запроса к данным. Для формирования его тела используем процессор ReplaceText.
Как вы можете видеть, в параметр end_time мы подставляем текущую дату, используя now() из Expression language.
POST-запрос в REST API исполняет процессор InvokeHTTP.
Чтобы привести полученные данные в необходимый вид перед записью в БД, используем процессор JoltTransformJSON. Отладить Jolt-трансформации удобно в песочнице на сайте Jolt Transform Demo.
Дальнейшие шаги по добавлению новых полей, логированию и записи в БД тут опустим, думаем, что принцип уже понятен. Такая ETL-цепочка позволяет нам ежедневно собирать данные в PSQL через REST API и довольно легко масштабируется на новые сервисы и датчики.
Приходилось ли вам решать аналогичные задачи? Если у вас есть предложения, как можно улучшить наши пайплайны или есть вопросы — напишите, пожалуйста, в комментарии, интересно узнать о вашем опыте работы с дата-инженерами в вашей команде.
Пост для вас готовили Василий Вологдин и Александр Тимаков.
О команде
Наверное, мы похожи на стандартную команду аналитиков данных (BI + DS) в IT-департаменте производственной компании. Работаем в Jira, делаем прототипы с заказчиком в Miro, ведём базу знаний в Confluence, используем Python и SQL для решения ежедневных задач, а итоговую отчётность предоставляем клиентам в Power BI. Исторически сложилось так, что мы сами общаемся с заказчиком, составляем ТЗ, подготавливаем данные, разрабатываем модели, внедряем ML в производство, ну и, конечно, строим дашборды (и не только их).Этот аспект (вкупе с большим количеством бизнес-направлений) неизбежно приводит к постоянному нашему развитию в различных областях — со временем учишься работать с промышленными данными SCADA, собирать данные из облачного SalesForce, редактировать объекты через API Jira и ещё массе всего.
В итоге это привело к тому, что мы самостоятельно построили аналитическое хранилище данных (PostgreSQL + MongoDB) для предоставления BI отчётности, быстрой отработки различных ad-hoc запросов и решения Data Science задач. Но выделенного дата-инженера у нас нет, и команда самостоятельно выстраивает пайплайны сбора и обработки необходимых данных из 30+ различных сервисов. При таком подходе есть свои плюсы и минусы, нам нравится, как о них написали в блоге Skyeng
О задачах
Четыре задачи, которые являются самыми распространенными в нашей практике, и самостоятельно решаются любым аналитиком данных с помощью NiFi:- Забрать данные из MySQL (или любой другой реляционной БД);
- Записать данные в PSQL;
- Масштабировать цепочку для сбора-записи данных;
- Инкрементально собрать данные с оборудования по REST API.
В рамках этого материала мы не будем говорить о базовой функциональности Apache NiFi, поэтому, если вы не знакомы с инструментом, рекомендуем изучить другие материалы по теме.
Забрать данные из MySQL
Одним из самых распространенных источников данных являются различные реляционные БД. Чтобы извлечь из них информацию с помощью Apache NiFi, понадобится всего два процессора: GenerateFlowFile и ExecuteSQL.Первый процессор — служит для запуска всей цепочки. Он формирует FlowFile, который поступает на вход следующего процессора и выполняет роль «триггера». GenerateFlowFile может запускаться по требованию или по расписанию, задаваемому выражением Сron. У нас это происходит в ночное время, чтобы не мешать работе наших сервисов днем. Например, на скриншоте ниже видно, что запуск задачи назначен на 02:50. Мы учитываем специфику работы БД, из которых будут загружаться данные. Такой подход позволяет нам равномерно распределять нагрузку и поддерживать стабильную работу всех задействованных систем.
Каждый FlowFile содержит данные (content) и метаданные (attributes), то есть полезную нагрузку для последующих процессоров. В качестве полезной нагрузки FlowFile’а может выступать необходимая нам информация.
Для нашего случая передадим в содержимом FlowFile SQL-скрипт, с помощью которого следующий процессор запросит в БД нужные данные.
SELECT id, title, code, status, result
FROM lims_test_active;
Для этого используем поле CustomText во вкладке Properties в окне настроек процессора GenerateFlowFile.
Если данных становится слишком много для ежедневной перезаписи, следует организовать инкрементальный сбор (хорошо подходит для различных логов, событий, показаний датчиков и т. д). Тогда не придется каждый раз заново собирать все данные, а можно «дособирать» только свежие — например, появившиеся после определённой даты или отсортированные по последнему ключу.
Второй процессор — ExecuteSQL — исполняет передаваемый в него запрос и возвращает полученные данные в формате Avro (формат хранения файлов, применяемый во многих продуктах Apache). Стоит отметить, что вместо ExecuteSQL можно задействовать процессор ExecuteSQLRecord. В зависимости от выбранного Controller Service в качестве RecordWriter он может отдавать FlowFile с содержимым в форматах Avro, JSON, CSV и XML, однако его настройка будет несколько сложнее.
Чтобы процессор ExecuteSQL понял, куда он должен отправить запрос, нужно настроить специальный Controller Service для подключения к базе данных. В настройках (поле Compatible Controller Services) выберем Database Connection Pooling Service. Он управляет открытыми соединениями к БД, создает новые и закрывает старые.
В свойствах Controller Service указываем URL-адрес целевой БД (Connection URL), имя класса драйвера (Database Driver Class Name) и путь до драйвера в файловой системе (Database Driver Location). Понять, что нужно писать в поле Class Name, можно по различным инструкциям в сети. В нашем случае мы указали Class Name для JDBC-драйвера — com.mysql.jdbc.Driver — и прописали его расположение на сервере NiFi. Этот драйвер позволяет выполнять SQL-запросы к базе данных и зависит от её типа, поэтому его нужно предварительно загрузить с официального сайта разработчиков [вот ссылка для MySQL, а вот — для PSQL].
Далее, остается выбрать настроенный сервис в свойствах процессора и заполнить остальные обязательные (выделены жирным) атрибуты.
Далее, объединим процессоры, указав при этом тип соединения с помощью отношения success или failure. Так, процессор будет понимать, куда направлять FlowFile c результатами работы или ошибкой. Теперь осталось запустить цепочку, и если все сделано правильно, то на схеме красные квадратики сменятся на зеленые треугольники.
В итоге мы сформировали цепочку из двух процессоров, которые получают таблицу целиком из внешней базы данных MySQL. Остается записать эти данные в новую базу данных для дальнейшей обработки и использования.
Записать данные в PSQL
Теперь, когда у нас есть собранные данные в формате Avro, запишем их в нашу БД PostgreSQL. Сначала создадим таблицу:CREATE TABLE lims_test_active (
id int,
title text,
code text,
status text,
result int
);
Для случаев в будущем (когда наша цепочка уже записала данные в таблицу), нужно предусмотреть удаление старых данных, перед перезаписью используем процессор PutSQL для выполнения TRUNCATE lims_test_active.
Далее, обратимся к процессору PutDatabaseRecord для записи новых данных. Требуется создать еще один Database Connection Pooling Service для подключения к БД, в которую мы будем записывать данные. Также пару слов тут нужно сказать про Record Reader. Это Controller Service для получения схемы данных из FlowFile (его контента или атрибутов).
Перейдем в настройки Record Reader, где в качестве значения для свойства Schema Access Strategy выберем встроенную в Avro схему данных (Use Embedded Avro Schema) и пропишем строку ${avro.schema} в блок Schema Text. Мы выбираем Use Embedded Avro Schema, так как до этого использовали процессор, получающий данные в формате Avro. На выбор стратегии влияет то, куда эта схема записана в пришедшем FlowFile (почитать про настройку можно в официальной документации, например, в описании AvroReader, или на тематических площадках). В свою очередь, содержание строки Schema Text (или Schema Name) зависит от того, в какой из этих атрибутов ранее была помещена схема.
Соединяем процессоры между собой с помощью отношения success, не забывая при этом «выключить» другие отношения (auto terminate). Это нужно сделать для того, чтобы процессор не выдавал ошибку типа: «Realationship X is invalid because Relationship X is not connected to any component and is not auto-terminated» (не все активные соединения знают, что им делать).
Отключить отношения можно вот так:
Схема принимает такой вид:
Если все прошло успешно, то мы увидим в нашей БД таблицу с данными, готовыми для дальнейшей обработки и анализа.
Конечно, если таблиц и сервисов много, то не стоит делать такую цепочку под каждую таблицу, это займет много времени и будет очень затратно при поддержке. Поэтому далее расскажем, как можно масштабировать этот пайплайн, для работы с десятками и даже сотнями таблиц.
Масштабировать цепочку для сбора-записи данных
В рамках этого поста мы опустим ряд моментов, связанных с логированием, мониторингом, разделением и слиянием больших файлов, а также некоторыми настройками процессоров. Если у вас возникнут вопросы, мы постараемся ответить на них в комментариях.Основная идея масштабирования состоит в том, чтобы держать список таблиц в одном месте, а при его обработке разделять и распараллеливать сборы. Мы храним список таблиц и запросов в XML, но подойдут и другие структуры данных вроде JSON или простого текста с разделителями. Вот так выглядит наш журнал для сбора таблиц из БД:
Структура документа следующая: теги <sources> задают начало и конец списка таблиц, а теги <source> — структуру одной таблицы. Теги <query> определяют SQL скрипт, а теги <out> хранят в себе наименование таблицы для сохранения данных, полученных в результате выполнения SQL запроса.
Двумя важными компонентами масштабируемого пайплайна являются процессоры SplitXML и EvaluateXPath.
Первый нужен, чтобы разделить FlowFile с изначальным XML на несколько файлов — по одному на каждую таблицу (теги <source>). Второй выполняет XPath-запросы (язык запросов к элементам XML-документа) по отношению к содержимому FlowFile. Их результаты записывает в соответствующие атрибуты FlowFile. Другими словами, процессор нужен для того, чтобы определенную информацию из контента переложить в атрибуты collection и query, которые теперь можно использовать в любом месте.
После того как EvaluateXPath выполнил свою задачу, в дело вступает ExecuteSQL, который реализует полученный на предыдущем шаге SQL-запрос (атрибут query) для загрузки нужных данных из БД.
На этом шаге можно настроить количество параллельно исполняемых задач процессором. Это указывается во вкладке Scheduling в настройках процессора ExecuteSQL.
Далее, используем PutSQL для очистки старых данных в нужной таблице БД, исполняя через него команду TRUNCATE, ее передаем в атрибуте SQL Statement, используя Expression Language для подстановки нужного имени таблицы.
Для вставки полученных данных в нужную таблицу используем процессор PutDatabaseRecord с настройками INSERT и указанием целевой таблицы ${collection}.
В итоге мы получили масштабируемую EL-цепочку, которую можно использовать для сбора данных из множества таблиц.
Инкрементально собрать данные с оборудования по REST API
Основная идея инкрементального сбора заключается в том, чтобы собирать только новые данные и копить их у себя в БД. Например, ночью мы получаем данные за прошлый день и сохраняем их.Каждый день мы обновляем список серий, ошибок, измерений, калибровок, событий и других параметров с различных сервисов, систем на производстве и в лабораториях. Для получения этих данных можно использовать различные промышленные протоколы например OPC UA, но благодаря усилиям коллег, занимающихся разработкой АСУ ТП, мы можем забирать их через REST API. Если интересно, то можно почитать их пост о системе управления биореакторами на базе openSCADA и отечественных контроллеров.
Далее — общий вид ETL-цепочки для этого кейса:
В этом случае GenerateFlowFile задает список параметров в XML — так же, как и в предыдущем кейсе.
Далее, в работу вступают уже знакомые процессоры SplitXML и EvaluateXPath. Мы разбиваем XML на разные FlowFile и записываем их содержимое в атрибуты device_name и parameter.
Следующий шаг — получить последнюю дату успешного сбора данных по конкретному датчику. Выполним этот запрос с помощью ExecuteSQLRecord и получим последнее значение end_time из нашей таблицы с логами.
Далее, передаем эту информацию в EvaluateJsonPath, который запишет её в атрибут start.
Значение даты в start пригодится нам для реализации нового запроса к данным. Для формирования его тела используем процессор ReplaceText.
Как вы можете видеть, в параметр end_time мы подставляем текущую дату, используя now() из Expression language.
POST-запрос в REST API исполняет процессор InvokeHTTP.
Чтобы привести полученные данные в необходимый вид перед записью в БД, используем процессор JoltTransformJSON. Отладить Jolt-трансформации удобно в песочнице на сайте Jolt Transform Demo.
Дальнейшие шаги по добавлению новых полей, логированию и записи в БД тут опустим, думаем, что принцип уже понятен. Такая ETL-цепочка позволяет нам ежедневно собирать данные в PSQL через REST API и довольно легко масштабируется на новые сервисы и датчики.
Приходилось ли вам решать аналогичные задачи? Если у вас есть предложения, как можно улучшить наши пайплайны или есть вопросы — напишите, пожалуйста, в комментарии, интересно узнать о вашем опыте работы с дата-инженерами в вашей команде.
Пост для вас готовили Василий Вологдин и Александр Тимаков.
Как мы собираем данные для аналитики с помощью Apache NiFi
Привет, Хабр! Мы команда мониторинга и анализа данных биотехнологической компании BIOCAD. Хотим рассказать вам о том, как мы собираем данные для аналитики из практически всех сервисов компании и при...
habr.com