Horovod — это фреймворк для распределенного глубокого обучения, изначально разработанный в Uber. Он позволяет масштабировать обучение моделей на сотни и тысячи GPU, сокращая время тренировки с недель до часов. Horovod поддерживает такие фреймворки, как TensorFlow, Keras, PyTorch и Apache MXNet, и легко интегрируется с существующими кодовыми базами, требуя минимум изменений.
В статье как раз и пойдет речь о том, как масштабировать модельки с помощью Horovod и Kubernetes.
# пример использования асинхронной передачи градиентов
hvd.allreduce_async(tensor, name)
В Horovod также есть механизмы для обнаружения сбоев и повторной инициализации процессов. При сбое одного из процессов Horovod может автоматом перезапустить его, сохранив текущее состояние обучения:
try:
hvd.init()
except Exception as e:
hvd.shutdown()
hvd.init()
import tensorflow as tf
import horovod.tensorflow as hvd
# инициализация Horovod
hvd.init()
# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0
# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также поддерживает PyTorch:
import torch
import horovod.torch as hvd
from torchvision import datasets, transforms
from torch import nn, optim
# Инициализация Horovod
hvd.init()
# Настройка устройств
torch.cuda.set_device(hvd.local_rank())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# загрузка данных
train_dataset = datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, sampler=train_sampler)
# определение модели
model = nn.Sequential(
nn.Flatten(),
nn.Linear(28 * 28, 128),
nn.ReLU(),
nn.Linear(128, 10)
).to(device)
# определение оптимизатора
optimizer = optim.Adam(model.parameters(), lr=0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# синхронизация начальных состояний
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# обучение модели
model.train()
for epoch in range(5):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = nn.CrossEntropyLoss()(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and hvd.rank() == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')
Horovod можно использовать с Keras:
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# инициализация Horovod
hvd.init()
# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0
# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также можно использовать с Apache MXNet:
import mxnet as mx
import horovod.mxnet as hvd
# инициализация Horovod
hvd.init()
# настройка устройств
context = mx.gpu(hvd.local_rank()) if mx.context.num_gpus() > 0 else mx.cpu()
# загрузка данных
train_data = mx.io.MNISTIter(
image="train-images-idx3-ubyte",
label="train-labels-idx1-ubyte",
input_shape=(784,),
batch_size=64,
shuffle=True,
flat=True
)
# определение модели
data = mx.sym.var('data')
fc1 = mx.sym.FullyConnected(data=data, num_hidden=128)
act1 = mx.sym.Activation(data=fc1, act_type="relu")
fc2 = mx.sym.FullyConnected(data=act1, num_hidden=10)
softmax = mx.sym.SoftmaxOutput(data=fc2, name='softmax')
# обертывание оптимизатора
optimizer_params = {'learning_rate': 0.01 * hvd.size()}
optimizer = mx.optimizer.create('adam', **optimizer_params)
optimizer = hvd.DistributedOptimizer(optimizer)
# настройка модели
model = mx.mod.Module(symbol=softmax, context=context)
model.fit(train_data, optimizer=optimizer, num_epoch=5)
В статье как раз и пойдет речь о том, как масштабировать модельки с помощью Horovod и Kubernetes.
Установим и настроим все необходимое
Docker необходим для контейнеризации приложений и их переносимости.- Установка Docker на Ubuntu:
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
sudo apt-get update
sudo apt-get install -y docker-ce
sudo usermod -aG docker ${USER}
- Установка Minikube (локал версия Kubernetes) на Ubuntu:
curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
chmod +x minikube
sudo mv minikube /usr/local/bin/
minikube start --driver=docker - Установка kubectl:
curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
- Установка Horovod с поддержкой GPU:
FROM tensorflow/tensorflow:2.16.1-gpu
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
libglib2.0-0
RUN pip install --upgrade pip
RUN pip install horovod
COPY . /app
WORKDIR /app
CMD ["python", "train.py"]
- Создадим Docker-образ:
sudo apt-get update
sudo apt-get install -y libopenmpi-dev openmpi-bin - Построение и пуш Docker-образа:
sudo apt-get install -y build-essential devscripts debhelper fakeroot
sudo apt-get install -y libnccl2 libnccl-dev - Создание манифеста Kubernetes для MPIJob:
export HOROVOD_GPU_ALLREDUCE=NCCL
export HOROVOD_GPU_BROADCAST=NCCL - Запуск MPIJob на Kubernetes:
import horovod.tensorflow as hvd
import tensorflow as tf
hvd.init()
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# код построения модели и обучения
- Установка MPI:
sudo apt-get update
sudo apt-get install -y libopenmpi-dev openmpi-bin - Установка NCCL:
sudo apt-get install -y build-essential devscripts debhelper fakeroot
sudo apt-get install -y libnccl2 libnccl-dev - Конфигурация NCCL:
Убеждаемся, что среда готова к использованию NCCL:
export HOROVOD_GPU_ALLREDUCE=NCCL
export HOROVOD_GPU_BROADCAST=NCCL
Основные возможности horovod
Horovod использует концепции, основанные на MPI. Основные шаги инициализации:- hvd.init(): вызов инициализирует Horovod и подготавливает его к работе. Внутри hvd.init() Horovod вызывает MPI_Init для инициализации среды MPI.
- hvd.size(): возвращает количество процессов (или воркеров), участвующих в распределенном обучении.
- hvd.rank(): возвращает текущий ранг процесса. Это уникальный идентификатор для каждого воркера.
- hvd.local_rank(): возвращает ранг процесса на текущей машине.
- hvd.broadcast(): синхронизирует начальное состояние переменных между всеми процессами, передавая данные из одного процесса (обычно с рангом 0) всем остальным.
- hvd.allreduce(): выполняет операцию сокращения (например, суммирование или нахождение среднего) и распространяет результат обратно всем процессам. Это ключевая операция для усреднения градиентов при распределенном обучении.
- hvd.allgather(): собирает данные от всех процессов и распространяет объединенный результат каждому процессу. Используется для сбора информации, такой как метрики или веса модели.
- HOROVOD_GPU_ALLREDUCE :переменная окружения, указывающая на использование NCCL для операций allreduce.
- HOROVOD_GPU_BROADCAST: переменная окружения, указывающая на использование NCCL для операций broadcast.
# пример использования асинхронной передачи градиентов
hvd.allreduce_async(tensor, name)
В Horovod также есть механизмы для обнаружения сбоев и повторной инициализации процессов. При сбое одного из процессов Horovod может автоматом перезапустить его, сохранив текущее состояние обучения:
try:
hvd.init()
except Exception as e:
hvd.shutdown()
hvd.init()
Примеры применения
Horovod можно легко интегрировать в код TensorFlow для организации распределенного обучения. Простой пример обучения модельки на нескольких GPU:import tensorflow as tf
import horovod.tensorflow as hvd
# инициализация Horovod
hvd.init()
# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0
# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также поддерживает PyTorch:
import torch
import horovod.torch as hvd
from torchvision import datasets, transforms
from torch import nn, optim
# Инициализация Horovod
hvd.init()
# Настройка устройств
torch.cuda.set_device(hvd.local_rank())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# загрузка данных
train_dataset = datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, sampler=train_sampler)
# определение модели
model = nn.Sequential(
nn.Flatten(),
nn.Linear(28 * 28, 128),
nn.ReLU(),
nn.Linear(128, 10)
).to(device)
# определение оптимизатора
optimizer = optim.Adam(model.parameters(), lr=0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# синхронизация начальных состояний
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# обучение модели
model.train()
for epoch in range(5):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = nn.CrossEntropyLoss()(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and hvd.rank() == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')
Horovod можно использовать с Keras:
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# инициализация Horovod
hvd.init()
# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# построение модели
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0
# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
Horovod также можно использовать с Apache MXNet:
import mxnet as mx
import horovod.mxnet as hvd
# инициализация Horovod
hvd.init()
# настройка устройств
context = mx.gpu(hvd.local_rank()) if mx.context.num_gpus() > 0 else mx.cpu()
# загрузка данных
train_data = mx.io.MNISTIter(
image="train-images-idx3-ubyte",
label="train-labels-idx1-ubyte",
input_shape=(784,),
batch_size=64,
shuffle=True,
flat=True
)
# определение модели
data = mx.sym.var('data')
fc1 = mx.sym.FullyConnected(data=data, num_hidden=128)
act1 = mx.sym.Activation(data=fc1, act_type="relu")
fc2 = mx.sym.FullyConnected(data=act1, num_hidden=10)
softmax = mx.sym.SoftmaxOutput(data=fc2, name='softmax')
# обертывание оптимизатора
optimizer_params = {'learning_rate': 0.01 * hvd.size()}
optimizer = mx.optimizer.create('adam', **optimizer_params)
optimizer = hvd.DistributedOptimizer(optimizer)
# настройка модели
model = mx.mod.Module(symbol=softmax, context=context)
model.fit(train_data, optimizer=optimizer, num_epoch=5)
Масштабирование глубокого обучения с помощью Horovod и Kubernetes
Привет, Хабр! Horovod — это фреймворк для распределенного глубокого обучения, изначально разработанный в Uber . Он позволяет масштабировать обучение моделей на сотни и тысячи GPU, сокращая время...
habr.com