Работаем с большими наборами данных в Spark3.2.0 с использованием Pandas

Kate

Administrator
Команда форума
Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas.

e5deb05d498f292d29a26794f13437cb.jpeg

1. Введение​

13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в Spark, помимо все прочего, был добавлен API Pandas. Pandas — мощный и хорошо известный среди дата-сайентистов пакет. Однако у Pandas есть свои ограничения при работе с большими объемами данных, потому что он обрабатывает данные на одной машине. Несколько лет назад databricks выпустили библиотеку ‘Koalas’, чтобы решить эту проблему.

Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.

2. Цель​

В этой статье говорится непосредственно о способах использования API Pandas в Spark:

  • Чтение данных в виде датафреймов pandas-spark;
  • Чтение данных в виде датафреймов spark и преобразование в датафреймы pandas-spark;
  • Создание датафреймов pandas-spark;
  • Применение SQL-запросов непосредственно к датафреймам pandas-spark;
  • Построение графиков на основе датафреймов pandas-spark;
  • Переход от koalas к API pandas в Spark.

3. Данные​

CSV-файл и Jupyter Notebook, упомянутые в этой статье, можно найти на моей странице GitHub. Наборы данных там небольшие, однако проиллюстрированные здесь подходы могут быть применимы в больших наборах.

4. Требуется установка​

Прежде чем продолжить, сначала скачайте spark3.2.0 (установочный файл можно найти здесь) и правильно настройте PySpark. Вам также понадобятся библиотеки pyarrow и plotly, которые можно установить через интерфейс jupyter notebook, как показано ниже:

  • pyarrow (!conda install -c conda-forge — yes pyarrow)
  • plotly (!conda install — yes plotly)
Прекрасно! Если ваш PySpark готов к труду и обороне, то давайте перейдем к следующему разделу.

5. Импорт библиотек и запуск сессий Spark​

Теперь начнем импортировать PySpark и запустим сессию с помощью блока кода, приведенного ниже.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
Spark info сообщает нам, что используется версия 3.2.0.

Изображение 1. Spark info.
Изображение 1. Spark info.
Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.

print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
Изображение 2: Используемые версии.
Изображение 2: Используемые версии.
Хорошо! Теперь с помощью pyspark.pandas импортируем функцию read_csv для чтения данных CSV файла в виде датафрейма pandas-spark.

Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON) значение 1.

from pyspark.pandas import read_csv
Изображение 3: Варнинг при импорте pyspark.pandas.
Изображение 3: Варнинг при импорте pyspark.pandas.import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv


# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"

from pyspark.pandas import read_csv

6.1 Чтение данных из csv-файла в виде датафрейма pandas-spark​

Для того, чтобы продемонстрировать различные варианты использования API pandas spark, мы воспользуемся файлом ‘example_csv.csv’. Функция read_csv возвращает датафрейм pandas-spark (назовите его: psdf).

# Читаем в качестве датафрейма pandas-spark
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Изображение 4: Датафрейм Pandas-Spark.
Изображение 4: Датафрейм Pandas-Spark.
Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2) и psdf.shape можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.

# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных

print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Изображение 5. Использование функций pandas на датафремах pandas-spark.
Изображение 5. Использование функций pandas на датафремах pandas-spark.
Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark(). На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5) и sdf.printSchema() выводят пять верхних строк и схему данных датафрейма spark соответственно.

# Преобразование из датафрейма pandas-spark в датафрейм spark

# Вывод пяти верхних строк датафрейма spark
sdf = psdf.to_spark()
sdf.show(5)
Изображение 6: Вывод пяти верхних строк датафрейма spark.
Изображение 6: Вывод пяти верхних строк датафрейма spark. # Вывод схемы
sdf.printSchema()
Изображение 7: Вывод схемы датафрейма spark.
Изображение 7: Вывод схемы датафрейма spark.

6.2. Чтение данных из csv-файла в виде датафрейма spark и их преобразование в датафрейм pandas-spark​

Мы можем преобразовать датафрейм spark в датафрейм pandas-spark с помощью команды to_pandas_on_spark(). Она принимает на вход датафрейм spark, на выходе мы получаем датафрейм pandas-spark (как вы могли и сами догадаться). Ниже, мы читаем данные в виде датафрейма spark (назовем его: sdf1). Чтобы подтвердить, что это датафрейм spark, мы можем использовать type(sdf1), который определяет, что это точно датафhейм spark, т.е. ‘pyspark.sql.dataframe.DataFrame’.

# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
Изображение 8. Результирующий тип - датафрейм spark.
Изображение 8. Результирующий тип - датафрейм spark.
После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”. Мы можем использовать функцию pandas, например, .head(), чтобы убедиться, что это все таки датафрейм pandas-spark.

# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
Изображение 9. Вывод двух верхних строк датафрейма pandas-spark.
Изображение 9. Вывод двух верхних строк датафрейма pandas-spark. # Проверка типа psdf1
type(psdf1)
Изображение 10. Результирующий тип - датафрейм pandas-spark.
Изображение 10. Результирующий тип - датафрейм pandas-spark.

6.3 Создание датафрейма pandas-spark​

В этом разделе мы разберем, как вместо создания датафрейма pandas-spark из CSV-файла сделать это напрямую, импортировав pyspark.pandas как ps. Ниже с помощью ps.DataFrame() мы создали датафрейм pandas-spark (psdf2). У psdf2 два признака и три строки.

import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Изображение 11. Созданный нами датафрейм pandas -spark.
Изображение 11. Созданный нами датафрейм pandas -spark.
Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark(), о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.

# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark
Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark

7. Применение SQL-запросов непосредственно к датафреймам pandas-spark​

Еще одна замечательная тема для обсуждения в рамках pandas-spark API — это функция sql. Давайте используем эту функцию на созданном ранее датафрейме pandas-spark (psdf2) для извлечения некоторой информации. По сути для выполнения SQL-запроса нам просто нужно запустить функцию ps.sql() поверх датафрейма pandas-spark. Как показано ниже, функция count(*) для данных psdf2 результат равный трем. Точно так же второй запрос выводит отфильтрованные данные со score болеьше 80.

# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
Изображение 13. Отображение результатов sql-запроса: всего три записи.
Изображение 13. Отображение результатов sql-запроса: всего три записи.# Возвращает датафрейм pandas-spark
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
Изображение 14. Отображение результатов sql-запроса: score> 80
Изображение 14. Отображение результатов sql-запроса: score> 80

8. Графики датафреймов pandas и pandas-spark​

Супер! Рад, что вы дошли до этого момента. Теперь предлагаю кратко затронуть возможности построения графиков нашего нового API pandas-spark. В отличие от статического графика по умолчанию в стандартном API Python pandas, график по умолчанию в API pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Сейчас мы импортируем данные в виде датафрейма pandas и pandas-spark и построим гистограмму по переменной зарплаты (salary) для каждого из типов данных.

# Чтение данных в виде датафрейма pandas
pddf = pd.read_csv(datapath+'example_csv.csv')

type(pddf)
#pandas.core.frame.DataFrame

pddf.head(2)
Изображение 15. Чтение данные в виде датафрейма pandas.
Изображение 15. Чтение данные в виде датафрейма pandas.
На изображении ниже показана гистограмма зарплаты из датафрейма pandas.

# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')

type(pdsdf)
# pyspark.pandas.frame.DataFrame

# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
Изображение 16. Дефолтная python-гистограмма датафрейма pandas
Изображение 16. Дефолтная python-гистограмма датафрейма pandas
Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.

Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).

# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.
Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.

9. Переход от Koalas в API Pandas​

Напоследок давайте поговорим о том, какие изменения требуются при переходе от библиотеки Koalas в API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса: что было в Koalas, и как это выглядит в новом API pandas-spark.

Таблица 1. Переход от Koalas в API pandas-spark
Таблица 1. Переход от Koalas в API pandas-spark

10. Заключение​

В этой статье вы узнали о способах использования недавно добавленной API pandas в spark3.2.0 с целью чтения данных, создания датафрейма, использования SQL непосредственно во фреймворке pandas-spark и перехода от существующей библиотеки Koalas в API pandas-spark.

 
Сверху