Потоковая передача данных с помощью Apache Spark и MongoDB

Kate

Administrator
Команда форума
MongoDB объявила о выпуске 10.0 версии коннектора MongoDB для Apache Spark. В этой версии используется новый API Spark Data Sources второй версии (V2) с поддержкой структурированной потоковой передачи данных Spark (Spark Structured Streaming).

Зачем нужна новая версия?​


Текущая версия MongoDB Spark Connector была первоначально написана в 2016 году и основана на API Spark Data Sources первой версии (V1). Хотя эта версия API по-прежнему поддерживается, компания Databricks выпустила обновлённую версию API, облегчающую работу таких источников данных (Data Sources), как MongoDB, со Spark. Благодаря тому, что MongoDB Spark коннектор использует V2 API, непосредственным преимуществом является более тесная интеграция со структурированной потоковой передачей данных Spark.
Примечание: Что касается предыдущей версии MongoDB Spark коннектора, которая работала с API V1, то MongoDB будет продолжать поддерживать этот выпуск до тех пор, пока Databricks полностью не откажется от API Data Source первой версии. Хотя новых функций внедряться уже не будет, обновления коннектора будут включать исправления ошибок и поддержку только актуальных версий Spark.

Какую версию использовать мне?​


Новая 10.0 версия коннектора MongoDB Spark не предназначена для прямой замены ваших приложений, использующих предыдущую версию коннектора MongoDB Spark.

Новый коннектор использует другое пространство имён с коротким названием mongodb (полный путь com.mongodb.spark.sql.connector.MongoTableProvider), в отличие от mongo (полный путь com.mongodb.spark.DefaultSource). Наличие разных пространств имён позволяет использовать обе версии коннектора в одном приложении Spark! Это полезно для модульного тестирования приложения с новым коннектором и осуществления перехода на новую версию в соответствии с вашими временными рамками.

Кроме того, мы меняем способ версионирования MongoDB Spark коннектора. Предыдущие версии коннектора MongoDB Spark соответствовали версии поддерживаемой Spark — например, версия 2.4 MongoDB Spark коннектора работала со Spark 2.4. Имейте в виду, что в последующих версиях этого уже не будет. В документации MongoDB будет чётко указано, какие версии Spark поддерживает коннектор.

Структурированная потоковая передача данных в базу данных MongoDB​


Apache Spark поставляется с механизмом обработки потоков под названием: «Структурированная потоковая обработка данных» (Structured Streaming), которая основана на SQL-движке Spark и API DataFrame. Spark Structured Streaming рассматривает каждый входящий поток данных как микропакет, постоянно добавляя каждый микропакет к целевому датасету (Dataset). Это позволяет легко преобразовать существующие пакетные задания Spark в потоковые задания. Структурированная потоковая обработка данных развивалась на протяжении всех выпусков Spark, и в версии Spark 2.3 появился режим непрерывной обработки, который позволил сократить время задержки микропакетов с более чем 100 мс до примерно 1 мс. В следующем примере мы покажем вам, как передавать данные между MongoDB и Spark с помощью структурированных потоков и непрерывной обработки. Сначала мы рассмотрим чтение данных из MongoDB.

Чтение потоковых данных из MongoDB​


Вы можете передавать данные из MongoDB в Spark с помощью нового Spark коннектора. В данном примере рассматривается потоковая передача данных о биржевых активах из кластера MongoDB Atlas. Образец документа в MongoDB выглядит следующим образом:

{
_id: ObjectId("624767546df0f7dd8783f300"),
company_symbol: 'HSL',
company_name: 'HUNGRY SYNDROME LLC',
price: 45.74,
tx_time: '2022-04-01T16:57:56Z'
}


В этом примере кода мы будем использовать новый MongoDB Spark коннектор и с его помощью считывать данные из коллекции StockData. Когда Spark коннектор открывает соединение для потокового чтения с MongoDB, он открывает соединение и создаёт поток изменений MongoDB (MongoDB Change Stream) для заданной базы данных и коллекции. Поток изменений используется для подписки на изменения в MongoDB. При вставке, обновлении и удалении данных создаются события потока изменений. Именно эти события изменений передаются обратно клиенту, а в данном случае — приложению Spark. Существуют параметры конфигурации, которые могут изменить структуру сообщения о событии. Например, если вы хотите вернуть только сам документ и не включать метаданные события потока изменений, установите для параметра spark.mongodb.change.stream.publish.full.document.only значение true.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.\
builder.\
appName("streamingExampleRead").\
config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0').\
getOrCreate()

query=(spark.readStream.format("mongodb")
.option('spark.mongodb.connection.uri', '<CONNECTION STRING>')
.option('spark.mongodb.database', 'Stocks') \
.option('spark.mongodb.collection', 'StockData') \
.option('spark.mongodb.change.stream.publish.full.document.only','true') \
.option("forceDeleteTempCheckpointLocation", "true") \
.load())

query.printSchema()


Схема выводится из коллекции MongoDB. Из команды printSchema видно, что структура нашего документа выглядит следующим образом:

root |-- _id: string (nullable = true) |-- company_name: string (nullable = true) |-- company_symbol: string (nullable = true) |-- price: double (nullable = true) |-- tx_time: string (nullable = true)

Мы можем проверить, передаётся ли датасет в потоковом режиме, с помощью команды isStreaming.

query.isStreaming


Далее, давайте прочитаем данные из консоли, как только они будут вставлены в MongoDB.

query2=(query.writeStream \
.outputMode("append") \
.option("forceDeleteTempCheckpointLocation", "true") \
.format("console") \
.trigger(continuous="1 second")
.start().awaitTermination());


Когда приведённый выше код был запущен через spark-submit, результат выглядел следующим образом:

Сокращено для краткости

Пакет данных #2​


+--------------------+--------------------+--------------+-----+-------------------+
| _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476caa6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|45.99|2022-04-01 17:20:42| |62476caa6df0f7dd8...|APPETIZING MARGIN...| AMP|12.81|2022-04-01 17:20:42| |62476caa6df0f7dd8...|EMBARRASSED COCKT...| ECC|38.18|2022-04-01 17:20:42| |62476caa6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:42| |62476caa6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:42| +--------------------+--------------------+--------------+-----+-------------------+

Сокращено для краткости

Пакет данных #3​


+--------------------+--------------------+--------------+-----+-------------------+
| _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476cab6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|46.04|2022-04-01 17:20:43| |62476cab6df0f7dd8...|APPETIZING MARGIN...| AMP| 12.8|2022-04-01 17:20:43| |62476cab6df0f7dd8...|EMBARRASSED COCKT...| ECC| 38.2|2022-04-01 17:20:43| |62476cab6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:43| |62476cab6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:43| +--------------------+--------------------+--------------+-----+-------------------+

Запись потоковых данных в MongoDB​


Далее рассмотрим пример, в котором мы передаём данные из Apache Kafka в MongoDB. Здесь источником является топик kafka stockdata.Stocks.StockData. По мере поступления данных в этот топик они проходят через Spark, содержимое сообщения разбирается, преобразуется и записывается в MongoDB. Вот листинг кода с комментариями в строке:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,TimestampType, DoubleType, StringType, StructField

spark = SparkSession.\
builder.\
appName("streamingExampleWrite").\
config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0').\
config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0').\
getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("startingOffsets", "earliest") \
.option("kafka.bootstrap.servers", "KAFKA BROKER HOST HERE") \
.option("subscribe", "stockdata.Stocks.StockData") \
.load()

schemaStock = StructType([ \
StructField("_id",StringType(),True), \
StructField("company_name",StringType(), True), \
StructField("company_symbol",StringType(), True), \
StructField("price",StringType(), True), \
StructField("tx_time",StringType(), True)])

schemaKafka = StructType([ \
StructField("payload",StringType(),True)])


Обратите внимание, что сообщение топика Kafka приходит в таком формате -> ключ key (двоичное binary), значение value (двоичное binary), топик topic (строка string), раздел partition (целое int), смещение offset (длительность long), временная метка timestamp (длительность long), тип временной метки timestamptype (целое int). См. руководство по интеграции структурированного потока + Kafka (брокер Kafka версии 0.10.0 или выше) для получения дополнительной информации об интеграции Kafka и Spark.

Чтобы обработать сообщение для использования в MongoDB, мы хотим выбрать значение, которое находится в двоичном формате, и преобразовать его в JSON.

stockDF=df.selectExpr("CAST(value AS STRING)")


Для сравнения, вот пример события (значение, преобразованное в строку), которое находится в топике Kafka:

{
"schema": {
"type": "string",
"optional": false
},
"payload": "{\"_id\": {\"$oid\": \"6249f8096df0f7dd8785d70a\"}, \"company_symbol\": \"GMI\", \"company_name\": \"GIDDY INNOVATIONS\", \"price\": 87.57, \"tx_time\": \"2022-04-03T15:39:53Z\"}"
}


Мы хотим изолировать поле полезной нагрузки и преобразовать его в представление JSON с помощью shcemaStock, определённого выше. Для наглядности мы разбили операцию на несколько этапов, чтобы лучше разъяснить процесс. Во-первых, мы хотим преобразовать значение в JSON.

stockDF=stockDF.select(from_json(col('value'),schemaKafka).alias("json_data")).selectExpr('json_data.*')


Теперь датасет содержит знакомые данные:


{
_id: ObjectId("624c6206e152b632f88a8ee2"),
payload: '{"_id": {"$oid": "6249f8046df0f7dd8785d6f1"},
"company_symbol": "GMI", "company_name": "GIDDY MONASTICISM
INNOVATIONS", "price": 87.62, "tx_time":
"2022-04-03T15:39:48Z"}'
}, …

Далее мы хотим захватить только значение поля payload и преобразовать его в JSON, поскольку оно хранится в виде строки.

stockDF=stockDF.select(from_json(col('payload'),schemaStock).alias("json_data2")).selectExpr('json_data2.*')


Теперь мы можем выполнять любые преобразования данных. В данном случае давайте преобразуем tx_time в метку времени timestamp.

stockDF=stockDF.withColumn("tx_time",col("tx_time").cast("timestamp"))


Датасет уже имеет формат, готовый к обработке в MongoDB, поэтому давайте выгрузим его в MongoDB. Для этого воспользуемся методом writeStream. Имейте в виду, что в нём нужно будет задать несколько параметров. Например, при наличии опции trigger результаты обрабатываются пакетно. В данном примере это происходит каждые 10 секунд. Удаление поля trigger приведёт к непрерывной записи. Для получения дополнительной информации об опциях и параметрах ознакомьтесь с руководством по структурированной потоковой записи.

dsw = (
stockDF.writeStream
.format("mongodb")
.queryName("ToMDB")
.option("checkpointLocation", "/tmp/pyspark7/")
.option("forceDeleteTempCheckpointLocation", "true")
.option('spark.mongodb.connection.uri', ‘<CONNECTION STRING>')
.option('spark.mongodb.database', 'Stocks')
.option('spark.mongodb.collection', 'Sink')
.trigger(continuous="10 seconds")
.outputMode("append")
.start().awaitTermination());


Идите вперёд и несите поток!​


Потоковые данные являются важным компонентом многих типов приложений. MongoDB развивалась на протяжении многих лет, постоянно добавляя возможности и функциональность для поддержки таких типов рабочих нагрузок. С помощью MongoDB Spark Connector версии 10.0 вы можете быстро передавать потоковые данные в MongoDB и из неё с помощью нескольких строк кода.


 
Сверху