Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas.
Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
Spark info сообщает нам, что используется версия 3.2.0.
Изображение 1. Spark info.
Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
Изображение 2: Используемые версии.
Хорошо! Теперь с помощью pyspark.pandas импортируем функцию read_csv для чтения данных CSV файла в виде датафрейма pandas-spark.
Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON) значение 1.
from pyspark.pandas import read_csv
Изображение 3: Варнинг при импорте pyspark.pandas.import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv
# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv
# Читаем в качестве датафрейма pandas-spark
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Изображение 4: Датафрейм Pandas-Spark.
Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2) и psdf.shape можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.
# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Изображение 5. Использование функций pandas на датафремах pandas-spark.
Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark(). На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5) и sdf.printSchema() выводят пять верхних строк и схему данных датафрейма spark соответственно.
# Преобразование из датафрейма pandas-spark в датафрейм spark
# Вывод пяти верхних строк датафрейма spark
sdf = psdf.to_spark()
sdf.show(5)
Изображение 6: Вывод пяти верхних строк датафрейма spark. # Вывод схемы
sdf.printSchema()
Изображение 7: Вывод схемы датафрейма spark.
# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
Изображение 8. Результирующий тип - датафрейм spark.
После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”. Мы можем использовать функцию pandas, например, .head(), чтобы убедиться, что это все таки датафрейм pandas-spark.
# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
Изображение 9. Вывод двух верхних строк датафрейма pandas-spark. # Проверка типа psdf1
type(psdf1)
Изображение 10. Результирующий тип - датафрейм pandas-spark.
import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Изображение 11. Созданный нами датафрейм pandas -spark.
Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark(), о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.
# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark
# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
Изображение 13. Отображение результатов sql-запроса: всего три записи.# Возвращает датафрейм pandas-spark
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
Изображение 14. Отображение результатов sql-запроса: score> 80
# Чтение данных в виде датафрейма pandas
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)
Изображение 15. Чтение данные в виде датафрейма pandas.
На изображении ниже показана гистограмма зарплаты из датафрейма pandas.
# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
# pyspark.pandas.frame.DataFrame
# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
Изображение 16. Дефолтная python-гистограмма датафрейма pandas
Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.
Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).
# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.
Таблица 1. Переход от Koalas в API pandas-spark
1. Введение
13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в Spark, помимо все прочего, был добавлен API Pandas. Pandas — мощный и хорошо известный среди дата-сайентистов пакет. Однако у Pandas есть свои ограничения при работе с большими объемами данных, потому что он обрабатывает данные на одной машине. Несколько лет назад databricks выпустили библиотеку ‘Koalas’, чтобы решить эту проблему.Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.
2. Цель
В этой статье говорится непосредственно о способах использования API Pandas в Spark:- Чтение данных в виде датафреймов pandas-spark;
- Чтение данных в виде датафреймов spark и преобразование в датафреймы pandas-spark;
- Создание датафреймов pandas-spark;
- Применение SQL-запросов непосредственно к датафреймам pandas-spark;
- Построение графиков на основе датафреймов pandas-spark;
- Переход от koalas к API pandas в Spark.
3. Данные
CSV-файл и Jupyter Notebook, упомянутые в этой статье, можно найти на моей странице GitHub. Наборы данных там небольшие, однако проиллюстрированные здесь подходы могут быть применимы в больших наборах.4. Требуется установка
Прежде чем продолжить, сначала скачайте spark3.2.0 (установочный файл можно найти здесь) и правильно настройте PySpark. Вам также понадобятся библиотеки pyarrow и plotly, которые можно установить через интерфейс jupyter notebook, как показано ниже:- pyarrow (!conda install -c conda-forge — yes pyarrow)
- plotly (!conda install — yes plotly)
5. Импорт библиотек и запуск сессий Spark
Теперь начнем импортировать PySpark и запустим сессию с помощью блока кода, приведенного ниже.import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
Spark info сообщает нам, что используется версия 3.2.0.
Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
Хорошо! Теперь с помощью pyspark.pandas импортируем функцию read_csv для чтения данных CSV файла в виде датафрейма pandas-spark.
Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON) значение 1.
from pyspark.pandas import read_csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv
# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv
6.1 Чтение данных из csv-файла в виде датафрейма pandas-spark
Для того, чтобы продемонстрировать различные варианты использования API pandas spark, мы воспользуемся файлом ‘example_csv.csv’. Функция read_csv возвращает датафрейм pandas-spark (назовите его: psdf).# Читаем в качестве датафрейма pandas-spark
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2) и psdf.shape можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.
# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark(). На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5) и sdf.printSchema() выводят пять верхних строк и схему данных датафрейма spark соответственно.
# Преобразование из датафрейма pandas-spark в датафрейм spark
# Вывод пяти верхних строк датафрейма spark
sdf = psdf.to_spark()
sdf.show(5)
sdf.printSchema()
6.2. Чтение данных из csv-файла в виде датафрейма spark и их преобразование в датафрейм pandas-spark
Мы можем преобразовать датафрейм spark в датафрейм pandas-spark с помощью команды to_pandas_on_spark(). Она принимает на вход датафрейм spark, на выходе мы получаем датафрейм pandas-spark (как вы могли и сами догадаться). Ниже, мы читаем данные в виде датафрейма spark (назовем его: sdf1). Чтобы подтвердить, что это датафрейм spark, мы можем использовать type(sdf1), который определяет, что это точно датафhейм spark, т.е. ‘pyspark.sql.dataframe.DataFrame’.# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”. Мы можем использовать функцию pandas, например, .head(), чтобы убедиться, что это все таки датафрейм pandas-spark.
# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
type(psdf1)
6.3 Создание датафрейма pandas-spark
В этом разделе мы разберем, как вместо создания датафрейма pandas-spark из CSV-файла сделать это напрямую, импортировав pyspark.pandas как ps. Ниже с помощью ps.DataFrame() мы создали датафрейм pandas-spark (psdf2). У psdf2 два признака и три строки.import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark(), о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.
# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
7. Применение SQL-запросов непосредственно к датафреймам pandas-spark
Еще одна замечательная тема для обсуждения в рамках pandas-spark API — это функция sql. Давайте используем эту функцию на созданном ранее датафрейме pandas-spark (psdf2) для извлечения некоторой информации. По сути для выполнения SQL-запроса нам просто нужно запустить функцию ps.sql() поверх датафрейма pandas-spark. Как показано ниже, функция count(*) для данных psdf2 результат равный трем. Точно так же второй запрос выводит отфильтрованные данные со score болеьше 80.# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
8. Графики датафреймов pandas и pandas-spark
Супер! Рад, что вы дошли до этого момента. Теперь предлагаю кратко затронуть возможности построения графиков нашего нового API pandas-spark. В отличие от статического графика по умолчанию в стандартном API Python pandas, график по умолчанию в API pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Сейчас мы импортируем данные в виде датафрейма pandas и pandas-spark и построим гистограмму по переменной зарплаты (salary) для каждого из типов данных.# Чтение данных в виде датафрейма pandas
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)
На изображении ниже показана гистограмма зарплаты из датафрейма pandas.
# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
# pyspark.pandas.frame.DataFrame
# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.
Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).
# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
9. Переход от Koalas в API Pandas
Напоследок давайте поговорим о том, какие изменения требуются при переходе от библиотеки Koalas в API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса: что было в Koalas, и как это выглядит в новом API pandas-spark.10. Заключение
В этой статье вы узнали о способах использования недавно добавленной API pandas в spark3.2.0 с целью чтения данных, создания датафрейма, использования SQL непосредственно во фреймворке pandas-spark и перехода от существующей библиотеки Koalas в API pandas-spark.Работаем с большими наборами данных в Spark3.2.0 с использованием Pandas
Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas. 1. Введение 13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в...
habr.com