Laboratório - Treinamento de Modelos Paralelos
Nesse laboratório vamos explorar formas de treinamento paralelo de modelos. Existem diferentes abordagens para realizar treinamento paralelo como: busca de hiperparâmetros, speed-up de treinamento de grandes modelos, inferência paralela de batches, entre outras. Vamos explorar algumas dessas técnicas utilizando tanto a distribuição do treinamento nos dispositivos locais dos clientes quanto em múltiplos dispositivos conectados em um cluster utilizando Ray. Sendo assim, nesse laboratório vamos utilizar as seguintes bibliotecas:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import ray
import time
import numpy as np
O que é Ray?

O Ray é uma biblioteca de código aberto projetada para a construção de aplicativos distribuídos e paralelos de maneira fácil e escalável. Ele oferece uma API simples para executar tarefas paralelamente e é amplamente utilizado para aprendizado de máquina, processamento de dados em grande escala, treinamento de modelos e inferências.
Com o Ray, você pode facilmente transformar funções Python em tarefas distribuídas e classes Python em "actors" (atores), que são objetos distribuídos e paralelos. Ele pode ser executado em um único computador ou em um cluster de máquinas, oferecendo flexibilidade e escalabilidade.
Como funciona o Ray?
-
Tasks (Tarefas): Qualquer função Python pode ser transformada em uma tarefa Ray para ser executada em paralelo. Essas tarefas podem ser distribuídas em diferentes núcleos de CPU ou máquinas.
-
Actors (Atores): Objetos que podem manter estado ao longo de várias tarefas e também podem ser distribuídos. Um "actor" permite que você tenha múltiplas instâncias paralelas e estados independentes.
-
Objects (Objetos): No Ray, tarefas e atores operam em objetos. Esses objetos são conhecidos como objetos remotos porque podem ser armazenados em qualquer lugar dentro de um cluster Ray. Usamos referências de objeto (object refs) para nos referir a esses objetos remotos. O armazenamento de objetos de memória compartilhada distribuída do Ray armazena em cache esses objetos remotos, e cada nó no cluster tem seu próprio armazenamento de objetos. Em um cluster, um objeto remoto pode existir em um ou vários nós, independentemente de qual nó contém a(s) referência(s) de objeto.
O Ray é amplamente utilizado para resolver problemas de alto desempenho como busca de hiperparâmetros, treinamento de modelos em grande escala e inferência paralela.
Ray Cluster
Um cluster Ray consiste em um grupo de nós de trabalho que são conectados a um nó de cabeça Ray central. Esses clusters podem ser configurados com um tamanho fixo ou podem ser dimensionados automaticamente de forma dinâmica com base nos requisitos de recursos dos aplicativos em execução no cluster
-
Cluster: Um cluster Ray compreende uma coleção de nós de trabalho vinculados a um nó principal Ray central. Esses clusters podem ter um tamanho predefinido ou aumentar ou diminuir dinamicamente com base nos requisitos de recursos dos aplicativos que operam no cluster.
-
Head node: Em cada cluster Ray, há um nó principal designado responsável pelas tarefas de gerenciamento do cluster (i.e., head node), como executar os processos do autoscaler e do driver Ray. Embora o nó principal funcione como um nó de trabalho regular, ele também pode receber tarefas e atores, o que não é ideal para clusters de grande escala.
-
Worker node: Os workers em um cluster Ray são os únicos responsáveis por executar o código do usuário dentro das tarefas e atores Ray. Eles não estão envolvidos na execução de nenhum processo de gerenciamento de nó principal. Esses nós de trabalho desempenham um papel crucial no agendamento distribuído e são responsáveis por armazenar e distribuir objetos Ray por toda a memória do cluster.
-
Autoscaling: O autoscaler Ray, em execução no nó principal, ajusta o tamanho do cluster com base nos requisitos de recursos da carga de trabalho Ray. Quando a carga de trabalho ultrapassa a capacidade do cluster, o autoscaler tenta adicionar mais nós de trabalho. Por outro lado, ele remove nós de trabalho ociosos do cluster. É crucial observar que o autoscaler responde exclusivamente a solicitações de recursos de tarefa e ator e não considera métricas de aplicativo ou utilização de recursos físicos.
ray.init()
Treinamento Distribuído Scikit-learn e Ray
Para demostrar a aplicação do Ray para o treinamento de modelos, vamos fazer um exemplo para a busca de hiperparâmetros em modelos de Regressão comparando uma abordagem centralizada com uma abordagem distribuída.
Importando os dados
Para esse exemplo vamos utilizar o conjunto de dados California Housing o qual contém dados sobre casas a venda na Califórnia. Podemos importar o dataset e fazer a separação treino e teste utilizando o código a seguir:
# Baixar o dataset
data = fetch_california_housing()
X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.2, random_state=42)
Note
O dataset utilizado é apenas um exemplo, outros datasets podem ser utilizados
Definindo Modelo a ser Treinado
Agora vamos definir uma função que vai fazer o treinamento de um modelo baseado nos hiperparâmetros recebidos como parâmetros da função. Nesse caso, estamos utilizando um Random Forest que recebe os parâmetros n_estimator e max_depth. Nessa função, também calculamos o MSE para definir o erro do Regressor e também o tempo para o treinamento do modelo com os parâmetros informados
# Função de treinamento com diferentes hiperparâmetros
@ray.remote
def treino_random_forest(n_estimators, max_depth):
# Criar e treinar o modelo
tempo_inicio = time.time()
model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
model.fit(X_train, y_train)
# Avaliar o modelo
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
tempo = time.time() - tempo_inicio
return (n_estimators, max_depth, mse, tempo)
Info
Note que a única alteração realizada foi a adição do decorator @ray.remote
para permitir que seja executada de forma distribuída em um cluster.
Definindo Hiperparametros
Dessa forma, definimos os seguintes hiperparâmetros que vão ser testados no Regressor
# Lista de hiperparâmetros para testar
hiperparametros = [(100, 10), (200, 15), (300, 20), (400, 25), (500, 30)]
Treinamento Centralizado
Primeiramente, vamos definir uma função para treinamento de modelos centralizados. A função busca_centralizada() realiza uma busca por hiperparâmetros para um modelo de regressão baseado em uma floresta aleatória (RandomForestRegressor). Ela avalia diferentes combinações de dois hiperparâmetros: o número de estimadores (n_estimators) e a profundidade máxima (max_depth), que estão armazenados na variável hiperparametros. Para cada combinação, a função ajusta o modelo aos dados de treino (X_train, y_train), faz previsões no conjunto de teste (X_test), calcula o erro quadrático médio (mean_squared_error), mede o tempo de execução, e armazena os resultados em uma lista. Ao final, a função retorna essa lista de resultados, juntamente com o tempo total gasto na busca.
def busca_centralizada():
resultados = []
tempo_total = time.time()
for n, d in hiperparametros:
tempo_inicio = time.time()
model = RandomForestRegressor(n_estimators=n, max_depth=d, random_state=42)
model.fit(X_train, y_train)
# Avaliar o modelo
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
tempo = time.time() - tempo_inicio
resultados.append((n, d, mse, tempo))
return resultados , time.time() - tempo_total
Treinamento Distribuído
Agora, vamos implementar a mesma função de forma distribuída com o Ray. A função busca_distribuida()
executa uma busca paralela por hiperparâmetros utilizando a biblioteca Ray para distribuir o treinamento do modelo de floresta aleatória (RandomForestRegressor). O código começa registrando o tempo total de execução. Em seguida, utiliza a função ray.get() para disparar tarefas paralelas com a função remota treino_random_forest.remote(n, d), que realiza o ajuste do modelo para diferentes combinações de hiperparâmetros (n_estimators e max_depth), os quais estão armazenados na variável hiperparametros. O processamento é distribuído e executado simultaneamente. A função retorna os resultados dessa busca paralela e o tempo total gasto na execução.
def busca_distribuida():
tempo_total = time.time()
# Executar a busca de hiperparâmetros em paralelo
resultados = ray.get([treino_random_forest.remote(n, d) for n, d in hiperparametros])
return resultados, time.time() - tempo_total
Warning
Com o decorator na função treino_random_forest
podemos utilizar o .remote()
para executar a task nos workers disponíveis. Para isso, utilizamos um onliner
para criar uma task para cada configuração.
Executando Treinamentos
resultados_centralizado, tempo_total_c = busca_centralizada()
resultados_distribuido, tempo_total_d = busca_distribuida()
Comparando Abordagens
Após o treinamento dos modelos, vamos gerar uma visualização para comparar o desempenho em relação ao tempo da abordagem centralizada em relação a abordagem distribuída
import matplotlib.pyplot as plt
import seaborn
fig, ax = plt.subplots(1, 2, figsize=(15, 2))
ax[0].plot(np.cumsum(np.array(resultados_centralizado)[:,-1]), label='Centralizado', color='r', marker='o')
ax[0].plot(np.array(resultados_distribuido)[:,-1], label='Distribuído', color='b', marker='o')
ax[0].grid(True, linestyle=':')
ax[0].legend()
ax[0].set_xlabel('Configurações')
ax[0].set_ylabel('Tempo (s)')
ax[0].set_xticks([0, 1, 2, 3, 4], ('(100, 10)', '(200, 15)', '(300, 20)', '(400, 25)', '(500, 30)'))
ax[1].barh(['Cent.', 'Dist.'], [tempo_total_c, tempo_total_d], ec='k', color=['r', 'b'])
ax[1].set_ylabel('Abordagens')
ax[1].set_xlabel('Tempo Total (s)')

Distribuindo Processos em um Cluster Ray
Primeiramente, devemos criar um cluster ray iniciando o head informando o a porta que disponibilizará o serviço, ip que será compartilhado e outras opções como por exemplo a dashboard de visualização. Assim, podemos iniciar da seguinte forma:
ray start --head --node-ip-address="IP" --port=PORTA --dashboard-host=0.0.0.0 --dashboard-port=PORTA_DASHBOARD
Em seguida, os workers podem se conectar diretamente ao head especificando o ip e porta que desejam se conectar da seguinte forma:
ray start --address=IP:PORTA
Para executar sua task de forma distribuída nesse cluster, é preciso iniciar o ray no head node e executa-lo da mesma forma que o anterior. O próprio ray ficará responsável por distribuir o workload nos workers disponíveis
Tip
O Ray disponibiliza um dashboard para acompanhar a execução das task em um ambiente gráfico
Treinamento Distribuído - TensorFlow
O TensorFlow oferece diversas estratégias para o treinamento distribuído, adaptadas a diferentes arquiteturas e necessidades, permitindo que você utilize várias GPUs (ou até várias máquinas) para treinar modelos de aprendizado profundo. As principais estratégias são:
-
tf.distribute.MirroredStrategy
: Para treinamento em uma única máquina com várias GPUs. Cria réplicas do modelo em cada GPU, onde as atualizações de gradiente são sincronizadas entre todas as GPUs no final de cada etapa de treinamento. Essa estratégia é eficiente quando se quer utilizar várias GPUs localmente. -
tf.distribute.MultiWorkerMirroredStrategy
: Para treinamento em várias máquinas (ou nós), cada uma com uma ou mais GPUs. Semelhante ao MirroredStrategy, mas pode escalar para várias máquinas. Utiliza um algoritmo chamado "All-Reduce" para combinar gradientes entre máquinas e GPUs. -
tf.distribute.TPUStrategy
: Para treinamento em TPUs (Tensor Processing Units), que são hardware especializado do Google para aprendizado profundo. Permite o uso de TPUs, distribuindo o treinamento entre os diferentes núcleos da TPU. TPUs são otimizadas para grandes cargas de trabalho e modelos maiores. -
tf.distribute.ParameterServerStrategy
: Para treinamento distribuído em larga escala. Distribui o treinamento em várias máquinas, onde algumas são designadas como servidores de parâmetros (armazenam e atualizam os pesos do modelo) e outras são "workers" (realizam o cálculo de gradientes). Esta abordagem é útil quando há muitos parâmetros para sincronizar entre as máquinas. -
tf.distribute.CentralStorageStrategy
: Em máquinas com múltiplas GPUs, onde os pesos do modelo são armazenados na CPU e os cálculos são feitos nas GPUs. Os gradientes são agregados na memória principal (CPU) ao invés de diretamente entre as GPUs, o que pode ser útil em casos com GPU com memória limitada.
Neste laboratório, focaremos em como usar o tf.distribute.MirroredStrategy
, que permite treinamento distribuído em uma única máquina com várias GPUs.

Importando Bibliotecas
# Import TensorFlow and TensorFlow Datasets
import tensorflow_datasets as tfds
import tensorflow as tf
# tfds.disable_progress_bar()
import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Isso suprime avisos
import os
Importando Dados
Para esse exemplo vamos utiliza o Cifar10
, portanto vamos utilizar o seguinte código para fazer o download e separar em treino e teste
# carrega o dataset que será utilizado nesse laboratório
datasets, info = tfds.load(name='cifar100', with_info=True, as_supervised=True, data_dir='./data')
cifar_train, cifar_test = datasets['train'], datasets['test']
Definindo Estratégia
Após o download do dataset vamos definir a estratégia distribuída que será utilizada para o treinamento. Nesse laboratório vamos utilizar a estratégia tf.distribute.MirroredStrategy()
para treinar o modelo de forma paralela em múltiplas GPUs porém utilizando batches diferentes
# Define a estratégia utilizada e mostra o números de GPUs disponíveis
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Number of devices: 2
Normalização e Batches
Em seguida, vamos fazer o processamento dos dados normalizando e separando em Batches. A função scale
faz a normalização dos pixels da imagem
# função para normalizar as imagens
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
A função get_data(distributed=True) configura e retorna os conjuntos de dados de treino e teste para um modelo de aprendizado de máquina, considerando se a execução será distribuída ou não. Primeiro, a função obtém o número de exemplos nos conjuntos de treino e teste a partir de uma variável info. Define um buffer de 10.000 para embaralhamento e um tamanho de lote (batch size) base de 64 por réplica. Se o processamento for distribuído (distributed=True), o tamanho do lote total é ajustado multiplicando o tamanho base pelo número de réplicas sincronizadas da estratégia distribuída (strategy.num_replicas_in_sync). Caso contrário, o tamanho do lote permanece 64. Os conjuntos de treino e teste são preparados com map(), aplicando uma função de escalonamento (scale), e são configurados com cache, embaralhamento e divisão em lotes. Finalmente, a função retorna esses conjuntos de dados processados (train_dataset e eval_dataset) para uso no treinamento e avaliação do modelo.
def get_data(distributed=True):
# retorna o número de exemplos nos conjuntos de treino e teste
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples
BUFFER_SIZE = 10_000
BATCH_SIZE_PER_REPLICA = 64
# usado pela estratégia distribuída
if distributed:
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
else:
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * 1
# configura os conjuntos de dados para treino e teste
train_dataset = cifar_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = cifar_test.map(scale).batch(BATCH_SIZE)
return train_dataset, eval_dataset
Warning
Note que quando utilizamos a abordagem distribuída precisamos alterar o tamanho dos batches * a quantidade de GPUs disponíveis.
Definindo Modelo
A função create_model() define e retorna um modelo de rede neural convolucional (CNN) usando a API Sequential do TensorFlow/Keras, projetado para tarefas de classificação de imagens, como o dataset CIFAR-10. O modelo consiste em três camadas convolucionais (Conv2D) com 32 e 64 filtros, todas ativadas por ReLU, seguidas por camadas de pooling (MaxPooling2D) para redução dimensional. Após a extração de características, os dados são achatados (Flatten) e passados por uma camada totalmente conectada (Dense) com 64 neurônios. A última camada possui 100 neurônios com ativação softmax, para classificação de 100 categorias. O modelo é compilado com a função de perda SparseCategoricalCrossentropy (adequada para classificação com rótulos inteiros), o otimizador Adam, e a métrica de precisão. O modelo resultante está pronto para ser treinado em um problema de classificação.
def create_model():
model = tf.keras.models.Sequential([
tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(100, activation='softmax') # 10 classes no CIFAR-10
])
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
return model
Treinamento Centralizado
Com o a função para criação do modelo definida, vamos iniciar o treinamento utilizando uma abordagem centralizada.
#define modelo único
model = create_model()
#inicia treinamento centralizado
train_dataset, eval_dataset = get_data(distributed=False)
hist_cent = model.fit(train_dataset, epochs=20)
Para apresentar o comportamento da abordagem centralizada, é apresentado a saída do comando nvidia-smi
. Nele podemos ver que apenas uma GPU está sendo utilizada para realizar o treinamento do modelo em questão, ocupando 36% da GPU NVIDIA GeForce GTC 1080 Ti

Treinamento Distribuído
Dessa forma, para realizar o treinamento utilizando uma abordagem distribuída precisamos definir o modelo dentro do escopo da estratégia. Isso já é suficiente para informar o framework que queremos distribuir o treinamento entre as GPUs disponíveis.
with strategy.scope():
# define o escopo que será aplicado a estratégia
model = create_model()
#inicia treinamento paralelo
train_dataset, eval_dataset = get_data(distributed=True)
hist_dist = model.fit(train_dataset, epochs=20)
Do mesmo modo, para apresentar o comportamento da abordagem distribuída, é apresentado a saída do comando nvidia-smi
. Nele podemos ver ambas as GPUs estão sendo utilizada para realizar o treinamento do modelo em questão, ocupando 29% e 32% das GPUs NVIDIA GeForce GTC 1080 Ti
disponíveis.

Warning
Para permitir o treinamento distribuído utilizando a estratégia definida, precisamos instanciar o modelo e também o .compile() dentro do escopo da estratégia
Atividade - Ray + MultiWorkerMirrorStrategy()
Agora é sua vez, com os exemplos apresentados nesse laboratório. Como podemos criar o treinamento de um modelo de deep learning em múltiplos nodes utilizando uma estratégia paralela?