Cloudera Streaming Analytics: унификация пакетной и потоковой обработки в SQL

Kate

Administrator
Команда форума
В октябре 2020 года Cloudera приобрела компанию Eventador, а в начале 2021 года был выпущен продукт Cloudera Streaming Analytics (CSA) 1.3.0. Это был первый релиз, который включал в себя SQL Stream Builder (SSB), полученный в результате интеграции наработок Eventador в продукт для аналитики потоквых данных на базе Apache Flink.

SQL Stream Builder (SSB) - это новый компонент со своим дружелюбным веб-интерфейсом, позволяющий анализировать потоковые и исторические данные в режиме реального времени в SQL, под капотом которого работает Apache Flink. На сегодняшний день около 3 миллиардов событий анализируется в Apache Fink ежедневно самыми прогрессивными компаниями по всему миру.

Первоначально разработчики сконцентрировались на внедрении в SSB языка определения данных Flink (DDL) и интерфейса для подключения к пакетным данным. Мы хотели использовать разработки последних версий Flink, а также добавить в SSB важные функции. Для заказчиков это открывает огромные новые возможности для интеграции существующих массивов данных с источниками потоковых данных в стеке Cloudera.

Уже в последних версиях (1.4 & 1.5) мы рады анонсировать CSA с унифицированным интерфейсом для работы с потоковыми и пакетными данными . Мы считаем, что это откроет новые возможности для использования в приложениях IoT, финансовой, производственной сфере и многих других отраслях. Это позволяет создавать уникальные потоки ETL, хранилища данных в реальном времени и формировать ценные потоки данных без масштабной модернизации инфраструктуры.

ff3e75559f722b4df612348a2222ee64.png

Почему пакетная + потоковая обработка?​

Долгое время считалось, что пакетная обработка и потоковая передача (системы с фиксированными и свободными границами) являются ортогональными технологиями - эталонной архитектурой, в которой потоки наполняют озера данных, и не более того.

Но, как работающие с данными специалисты-практики, мы хотели большего. Нам нужна была возможность обращаться к пакетным данным и источникам потоковой передачи данных в логике приложений. Мы хотели иметь инструменты и грамматику SQL, чтобы легко работать с ними. Нам необходимо было легко объединять существующие корпоративные источники данных и потоки данных с высокой скоростью доставки и малой задержкой. Нам нужна была гибкость для решения задач как с помощью пакетных, так и потоковых API, а также возможности прямого подключения к источникам для беспрепятственного чтения и записи данных. Нам нужно было экспериментировать с приложениями, повторять и затем развертывать процессоры, которые можно масштабировать и восстанавливать без массового воспроизведения данных. Мы хотели, чтобы схемы автоматически определялись там, где это возможно, и существовал богатый инструментарий для их создания там, где это необходимо.

В конце концов, бизнесу не важно, каков формат исходных данных. и нам нужен был фреймворк, который позволял бы быстро и легко предоставлять данные как продукт без масштабного наращивания инфраструктуры или использования баз данных нижестоящего уровня. У этой архитектуры нет причудливого названия - в основном потому, что так все так и должно было работать. CSA упрощает создание этих продуктов-данных.

27d4bbfd5574f7219b8be6ffc40cd8cc.png

Исторические данные в Kudu/Hive и др.; высокоскоростные потоковые данные в Kafka; непрерывная обработка потоковых данных с помощью SQL (через Flink); информационные панели BI-приложений, микросервисы, временные ряды, одностраничные приложения, блокнот и пр. (протоколы REST или PostgreSQL); материализованные представления в виде ценных и готовых к использованию наборов данных (PostgreSQL с потоковым обновлением по ключу).

Немного из истории Flink​

Cloudera Streaming Analytics работает на базе Apache Flink и включает в себя SQL Stream Builder и ядро движка Flink. Возможно, вы не этого знали, но Apache Flink с самого начала был фреймворком для пакетной обработки. Вместе с тем Flink изначально использовал пакетную и потоковую передачу с двумя отдельными API. Предложение 131 по улучшению Flink (Flink Improvement Proposal 131) переопределило API Flink с акцентом на объединение ограниченной/неограниченной обработки в рамках одного и того же API. А раньше нужно было выбирать тот или иной API. С появлением Flip-131 режим обработки будет полностью абстрагирован от программы в табличном API, что позволит разработчику писать программы, которые аккуратно и просто объединяют две парадигмы обработки. Flink всегда ориентировался на корректные результаты и поддерживал однократную обработку. Сочетание мощности движка с грамматикой соединения с ограничением по времени дает нам возможность запрашивать ограниченные (пакетные) и неограниченные (потоковые) данные с использованием простого синтаксиса join. Это полностью меняет правила игры.

SQL Stream Builder и пакетные данные​

В отличие от самого Flink, SQL Stream Builder зародился как чисто потоковый интерфейс. Начиная с CSA 1.4, SSB позволяет выполнять запросы для объединения и обогащения потоковых данных с историческими. Для обогащения потоков SSB может присоединять данные из таких источников как Kudu, Hive/Impala и JDBC. Со временем мы продолжим добавление источников и приемников ограниченных данных. SSB всегда мог объединять несколько потоков данных, но теперь он позволяет также обогащать их с помощью источников пакетных данных.

Data Definition Language (DDL)

В основе новой функциональности лежит включение Flink DDL в SSB. Таблицы определяются с помощью схемы (подразумеваемой или указанной), и затем к ним можно применять Continuous SQL, как к любому другому источнику. Кроме того, автоматически доступны все источники данных на платформе Cloudera Data Platform.

-- auto-inferred from CDP catalog import
CREATE TABLE `CDP_Hive_Catalog`.`airplanes`.`faa_aircraft` (
`tailnumber` VARCHAR(255),
`model` VARCHAR(255),
`serial` VARCHAR(255),
`icao` VARCHAR(255),
`owner` VARCHAR(255)
) WITH (
...
)
Чтение и обогащение потоковых данных историческими

Например, здесь мы обогащаем поток данных, который измеряет статус тестирования производственных систем. Мы дополняем поток (a) данными о сотрудниках из (b). Мы используем грамматику Flink для указания времени для таблицы (proctime ()) и задаем ключ соединения.

SELECT
a.stationid, a.test, b.managername
FROM
mfgrline AS a
JOIN `CDP_Kudu_Catalog`.`HR_db`.`impala::mfgr.stations`
FOR SYSTEM_TIME AS OF PROCTIME() AS b
ON a.stationid = b.stationid
Также возможно для объединения потоков скомбинировать в одном запросе несколько источников, в том числе потоковых:

ab6e91327c44e653ee9ae46435bdf8ce.JPG

Запись данных в системы-приемники​

SSB также может писать результаты обработки в различные системы хранения данных. Это мощный инструмент не только для хранения результатов некоторых вычислений, но и для сохранения логического состояния вычислений. Например, можно вести бухгалтерскую книгу для учетных записей, которые вы отключили из-за опасности мошенничества, так что не нужно будет отправлять запросы повторно. Чтобы записать в приемник, достаточно определить таблицу и выбрать ее в качестве приемника.

создание таблицы, в которую будем записывать результаты
создание таблицы, в которую будем записывать результаты
Запуск запроса с записью результата в таблицу sensor_history
Запуск запроса с записью результата в таблицу sensor_history

Новые архитектуры и сценарии использования​

Благодаря новым возможностям, которые предоставляет CSA в последней версии 1.5, возможны новые сценарии использования, а также новые реализации, снижающие задержку и ускоряющие выход на рынок.

  • Распределенное хранилище данных в реальном времени - обогащение потоковых данных историческими с сохранением результата в виде материализованных представлений, доступных по API. Например, возможен расширенный анализ потока кликов или объединение данных с датчиков с историческими измерениями. Новый интерфейс DDL в SSB предоставляет возможность определять источники потоковой передачи и пакетной обработки из любой точки стека CDP и объединять их с помощью Continuous SQL.
  • Data Science - качественный анализ требует контекста. Например, внедрение персонализированного клиентского опыта в режиме реального времени с обогащением онлайн потоков данных о поведении клиентов исторической информацией о прошлых событиях для ML моделей на Python. SQL Stream Builder предоставляет простой интерфейс REST для материализованных представлений, который легко интегрируется с Python и Pandas внутри блокнотов, поэтому специалисты по данным могут сосредоточиться на небольших, но ценных наборах данных в собственных инструментах, вместо того, чтобы анализировать весь поток данных.
  • Производственные операции в реальном времени - на производстве важна возможность беспрепятственно обращаться к источникам данных по всему предприятию, а затем материализовать представления для информационных панелей. Это может помочь сократить отходы, контролировать затраты и повысить качество. Пример - объединение исторической частоты отказов станций с текущей телеметрией для отображения прогнозных результатов в Cloudera Dataviz или Grafana.

Что ещё новенького и полезного есть в CSA 1.5?​

  • Импорт Change Data Capture - мы добавили поддержку импорта с Change Data Capture из реляционных баз данных на основе open source проекта Debezium, который оборачивает Flink как среду выполнения вокруг логики, импортированной из Debezium. Этот подход не требует внесения изменений в таблицы репликации базы данных, вместо этого он напрямую подключается к потоку репликации базы данных.
    Например, следующая таблица может быть определена для подключения к потоку CDC из СУБД Oracle:
24aa15775c242eb847d2be8e1d0e34be.png

  • Java UDF - в SQL Stream Builder уже была поддержка UDF на Javascript, определенных в графическом интерфейсе. Теперь мы добавили возможность использовать Flink SQL Java UDF-функции , добавляя их в classpath.
    Например, следующая простая функция инкремента реализована как Flink Java Function:
package com.cloudera;
import org.apache.flink.table.functions.ScalarFunction;

public class FlinkTestJavaUDF extends ScalarFunction {
public Integer eval(Integer i) {
return i + 1;
}
}
  • RESTful API для SQL Stream Builder - в этом релизе мы представляем RESTful API для всех операций SQL Stream Builder. Это обеспечивает программный доступ и автоматизацию заданий в SQL Stream Builder. Сопровождающая страница Swagger доступна как часть нашей документации. Например, следующий вызов создает самостоятельное новое задание:
curl --location --request POST '<streaming_sql_engine_host>:<streaming_sql_engine_port>/api/v1/ssb/sql/execute' \

--header 'Content-Type: application/json' \
--data-raw '{
"sql": "CREATE TABLE IF NOT EXISTS datagen_sample (col_int INT, col_ts TIMESTAMP(3), WATERMARK FOR col_ts AS col_ts - INTERVAL '\''5'\'' SECOND) WITH ('\''connector'\'' = '\''datagen'\'');\nSELECT * FROM datagen_sample;",
"job_parameters": {
"job_name": "production_job"
}
}'

 
Сверху