Масштабирование глубокого обучения с помощью Horovod и Kubernetes

Kate

Administrator
Команда форума
Horovod — это фреймворк для распределенного глубокого обучения, изначально разработанный в Uber. Он позволяет масштабировать обучение моделей на сотни и тысячи GPU, сокращая время тренировки с недель до часов. Horovod поддерживает такие фреймворки, как TensorFlow, Keras, PyTorch и Apache MXNet, и легко интегрируется с существующими кодовыми базами, требуя минимум изменений.

В статье как раз и пойдет речь о том, как масштабировать модельки с помощью Horovod и Kubernetes.

Установим и настроим все необходимое​

Docker необходим для контейнеризации приложений и их переносимости.

  1. Установка Docker на Ubuntu:
    sudo apt-get update
    sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common
    curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
    sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
    sudo apt-get update
    sudo apt-get install -y docker-ce
    sudo usermod -aG docker ${USER}
Kubernetes используется для оркестрации контейнеров.

  1. Установка Minikube (локал версия Kubernetes) на Ubuntu:
    curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
    chmod +x minikube
    sudo mv minikube /usr/local/bin/
    minikube start --driver=docker
  2. Установка kubectl:
    curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
    chmod +x kubectl
    sudo mv kubectl /usr/local/bin/
А теперь установим сам Horovod:

  1. Установка Horovod с поддержкой GPU:
    FROM tensorflow/tensorflow:2.16.1-gpu

    RUN apt-get update && apt-get install -y \
    libgl1-mesa-glx \
    libglib2.0-0

    RUN pip install --upgrade pip
    RUN pip install horovod

    COPY . /app
    WORKDIR /app

    CMD ["python", "train.py"]
Для развертывания распределенного обучения с Horovod на Kubernetes нужно создать и настроить соответствующие манифесты Kubernetes.

  1. Создадим Docker-образ:
    sudo apt-get update
    sudo apt-get install -y libopenmpi-dev openmpi-bin
  2. Построение и пуш Docker-образа:
    sudo apt-get install -y build-essential devscripts debhelper fakeroot
    sudo apt-get install -y libnccl2 libnccl-dev
  3. Создание манифеста Kubernetes для MPIJob:
    export HOROVOD_GPU_ALLREDUCE=NCCL
    export HOROVOD_GPU_BROADCAST=NCCL
  4. Запуск MPIJob на Kubernetes:
    import horovod.tensorflow as hvd
    import tensorflow as tf

    hvd.init()

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

    if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

    # код построения модели и обучения
Для адекватной работы Horovod необходимо плюсом настроить MPI и NCCL.

  1. Установка MPI:
    sudo apt-get update
    sudo apt-get install -y libopenmpi-dev openmpi-bin
  2. Установка NCCL:
    sudo apt-get install -y build-essential devscripts debhelper fakeroot
    sudo apt-get install -y libnccl2 libnccl-dev
  3. Конфигурация NCCL:
    Убеждаемся, что среда готова к использованию NCCL:
    export HOROVOD_GPU_ALLREDUCE=NCCL
    export HOROVOD_GPU_BROADCAST=NCCL

Основные возможности horovod​

Horovod использует концепции, основанные на MPI. Основные шаги инициализации:

  • hvd.init(): вызов инициализирует Horovod и подготавливает его к работе. Внутри hvd.init() Horovod вызывает MPI_Init для инициализации среды MPI.
  • hvd.size(): возвращает количество процессов (или воркеров), участвующих в распределенном обучении.
  • hvd.rank(): возвращает текущий ранг процесса. Это уникальный идентификатор для каждого воркера.
  • hvd.local_rank(): возвращает ранг процесса на текущей машине.
Horovod использует несколько коллективных операций для синхронизации данных между процессами:

  • hvd.broadcast(): синхронизирует начальное состояние переменных между всеми процессами, передавая данные из одного процесса (обычно с рангом 0) всем остальным.
  • hvd.allreduce(): выполняет операцию сокращения (например, суммирование или нахождение среднего) и распространяет результат обратно всем процессам. Это ключевая операция для усреднения градиентов при распределенном обучении.
  • hvd.allgather(): собирает данные от всех процессов и распространяет объединенный результат каждому процессу. Используется для сбора информации, такой как метрики или веса модели.
Horovod тесно интегрируется с NCCL для того, чтобы коммуникации между GPU были особо высокопроизводительными.

  • HOROVOD_GPU_ALLREDUCE :переменная окружения, указывающая на использование NCCL для операций allreduce.
  • HOROVOD_GPU_BROADCAST: переменная окружения, указывающая на использование NCCL для операций broadcast.
Horovod поддерживает асинхронные коммуникации и конвейеры для минимизации времени простоя GPU. Это достигается через использование неблокирующих вызовов MPI и стратегий оверлапа вычислений и передачи данных:

# пример использования асинхронной передачи градиентов
hvd.allreduce_async(tensor, name)
В Horovod также есть механизмы для обнаружения сбоев и повторной инициализации процессов. При сбое одного из процессов Horovod может автоматом перезапустить его, сохранив текущее состояние обучения:

try:
hvd.init()
except Exception as e:
hvd.shutdown()
hvd.init()

Примеры применения​

Horovod можно легко интегрировать в код TensorFlow для организации распределенного обучения. Простой пример обучения модельки на нескольких GPU:

import tensorflow as tf
import horovod.tensorflow as hvd

# инициализация Horovod
hvd.init()

# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)

if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0

# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]

# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также поддерживает PyTorch:

import torch
import horovod.torch as hvd
from torchvision import datasets, transforms
from torch import nn, optim

# Инициализация Horovod
hvd.init()

# Настройка устройств
torch.cuda.set_device(hvd.local_rank())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# загрузка данных
train_dataset = datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, sampler=train_sampler)

# определение модели
model = nn.Sequential(
nn.Flatten(),
nn.Linear(28 * 28, 128),
nn.ReLU(),
nn.Linear(128, 10)
).to(device)

# определение оптимизатора
optimizer = optim.Adam(model.parameters(), lr=0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# синхронизация начальных состояний
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# обучение модели
model.train()
for epoch in range(5):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = nn.CrossEntropyLoss()(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and hvd.rank() == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')
Horovod можно использовать с Keras:

import tensorflow as tf
import horovod.tensorflow.keras as hvd

# инициализация Horovod
hvd.init()

# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)

if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0

# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]

# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также можно использовать с Apache MXNet:

import mxnet as mx
import horovod.mxnet as hvd

# инициализация Horovod
hvd.init()

# настройка устройств
context = mx.gpu(hvd.local_rank()) if mx.context.num_gpus() > 0 else mx.cpu()

# загрузка данных
train_data = mx.io.MNISTIter(
image="train-images-idx3-ubyte",
label="train-labels-idx1-ubyte",
input_shape=(784,),
batch_size=64,
shuffle=True,
flat=True
)

# определение модели
data = mx.sym.var('data')
fc1 = mx.sym.FullyConnected(data=data, num_hidden=128)
act1 = mx.sym.Activation(data=fc1, act_type="relu")
fc2 = mx.sym.FullyConnected(data=act1, num_hidden=10)
softmax = mx.sym.SoftmaxOutput(data=fc2, name='softmax')

# обертывание оптимизатора
optimizer_params = {'learning_rate': 0.01 * hvd.size()}
optimizer = mx.optimizer.create('adam', **optimizer_params)
optimizer = hvd.DistributedOptimizer(optimizer)

# настройка модели
model = mx.mod.Module(symbol=softmax, context=context)
model.fit(train_data, optimizer=optimizer, num_epoch=5)


 
Сверху