Skip to content

Laboratório - Treinamento de Modelos Paralelos

Nesse laboratório vamos explorar formas de treinamento paralelo de modelos. Existem diferentes abordagens para realizar treinamento paralelo como: busca de hiperparâmetros, speed-up de treinamento de grandes modelos, inferência paralela de batches, entre outras. Vamos explorar algumas dessas técnicas utilizando tanto a distribuição do treinamento nos dispositivos locais dos clientes quanto em múltiplos dispositivos conectados em um cluster utilizando Ray. Sendo assim, nesse laboratório vamos utilizar as seguintes bibliotecas:

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import ray
import time
import numpy as np

O que é Ray?

O Ray é uma biblioteca de código aberto projetada para a construção de aplicativos distribuídos e paralelos de maneira fácil e escalável. Ele oferece uma API simples para executar tarefas paralelamente e é amplamente utilizado para aprendizado de máquina, processamento de dados em grande escala, treinamento de modelos e inferências.

Com o Ray, você pode facilmente transformar funções Python em tarefas distribuídas e classes Python em "actors" (atores), que são objetos distribuídos e paralelos. Ele pode ser executado em um único computador ou em um cluster de máquinas, oferecendo flexibilidade e escalabilidade.

Como funciona o Ray?

  • Tasks (Tarefas): Qualquer função Python pode ser transformada em uma tarefa Ray para ser executada em paralelo. Essas tarefas podem ser distribuídas em diferentes núcleos de CPU ou máquinas.

  • Actors (Atores): Objetos que podem manter estado ao longo de várias tarefas e também podem ser distribuídos. Um "actor" permite que você tenha múltiplas instâncias paralelas e estados independentes.

  • Objects (Objetos): No Ray, tarefas e atores operam em objetos. Esses objetos são conhecidos como objetos remotos porque podem ser armazenados em qualquer lugar dentro de um cluster Ray. Usamos referências de objeto (object refs) para nos referir a esses objetos remotos. O armazenamento de objetos de memória compartilhada distribuída do Ray armazena em cache esses objetos remotos, e cada nó no cluster tem seu próprio armazenamento de objetos. Em um cluster, um objeto remoto pode existir em um ou vários nós, independentemente de qual nó contém a(s) referência(s) de objeto.

O Ray é amplamente utilizado para resolver problemas de alto desempenho como busca de hiperparâmetros, treinamento de modelos em grande escala e inferência paralela.

Ray Cluster

Um cluster Ray consiste em um grupo de nós de trabalho que são conectados a um nó de cabeça Ray central. Esses clusters podem ser configurados com um tamanho fixo ou podem ser dimensionados automaticamente de forma dinâmica com base nos requisitos de recursos dos aplicativos em execução no cluster

  • Cluster: Um cluster Ray compreende uma coleção de nós de trabalho vinculados a um nó principal Ray central. Esses clusters podem ter um tamanho predefinido ou aumentar ou diminuir dinamicamente com base nos requisitos de recursos dos aplicativos que operam no cluster.

  • Head node: Em cada cluster Ray, há um nó principal designado responsável pelas tarefas de gerenciamento do cluster (i.e., head node), como executar os processos do autoscaler e do driver Ray. Embora o nó principal funcione como um nó de trabalho regular, ele também pode receber tarefas e atores, o que não é ideal para clusters de grande escala.

  • Worker node: Os workers em um cluster Ray são os únicos responsáveis ​​por executar o código do usuário dentro das tarefas e atores Ray. Eles não estão envolvidos na execução de nenhum processo de gerenciamento de nó principal. Esses nós de trabalho desempenham um papel crucial no agendamento distribuído e são responsáveis ​​por armazenar e distribuir objetos Ray por toda a memória do cluster.

  • Autoscaling: O autoscaler Ray, em execução no nó principal, ajusta o tamanho do cluster com base nos requisitos de recursos da carga de trabalho Ray. Quando a carga de trabalho ultrapassa a capacidade do cluster, o autoscaler tenta adicionar mais nós de trabalho. Por outro lado, ele remove nós de trabalho ociosos do cluster. É crucial observar que o autoscaler responde exclusivamente a solicitações de recursos de tarefa e ator e não considera métricas de aplicativo ou utilização de recursos físicos.

ray.init()

Treinamento Distribuído Scikit-learn e Ray

Para demostrar a aplicação do Ray para o treinamento de modelos, vamos fazer um exemplo para a busca de hiperparâmetros em modelos de Regressão comparando uma abordagem centralizada com uma abordagem distribuída.

Importando os dados

Para esse exemplo vamos utilizar o conjunto de dados California Housing o qual contém dados sobre casas a venda na Califórnia. Podemos importar o dataset e fazer a separação treino e teste utilizando o código a seguir:

# Baixar o dataset
data = fetch_california_housing()
X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.2, random_state=42)

Note

O dataset utilizado é apenas um exemplo, outros datasets podem ser utilizados

Definindo Modelo a ser Treinado

Agora vamos definir uma função que vai fazer o treinamento de um modelo baseado nos hiperparâmetros recebidos como parâmetros da função. Nesse caso, estamos utilizando um Random Forest que recebe os parâmetros n_estimator e max_depth. Nessa função, também calculamos o MSE para definir o erro do Regressor e também o tempo para o treinamento do modelo com os parâmetros informados

# Função de treinamento com diferentes hiperparâmetros
@ray.remote
def treino_random_forest(n_estimators, max_depth):
    # Criar e treinar o modelo
    tempo_inicio = time.time()
    model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)

    # Avaliar o modelo
    predictions = model.predict(X_test)
    mse         = mean_squared_error(y_test, predictions)
    tempo       = time.time() - tempo_inicio
    return (n_estimators, max_depth, mse, tempo)

Info

Note que a única alteração realizada foi a adição do decorator @ray.remote para permitir que seja executada de forma distribuída em um cluster.

Definindo Hiperparametros

Dessa forma, definimos os seguintes hiperparâmetros que vão ser testados no Regressor

# Lista de hiperparâmetros para testar
hiperparametros = [(100, 10), (200, 15), (300, 20), (400, 25), (500, 30)]

Treinamento Centralizado

Primeiramente, vamos definir uma função para treinamento de modelos centralizados. A função busca_centralizada() realiza uma busca por hiperparâmetros para um modelo de regressão baseado em uma floresta aleatória (RandomForestRegressor). Ela avalia diferentes combinações de dois hiperparâmetros: o número de estimadores (n_estimators) e a profundidade máxima (max_depth), que estão armazenados na variável hiperparametros. Para cada combinação, a função ajusta o modelo aos dados de treino (X_train, y_train), faz previsões no conjunto de teste (X_test), calcula o erro quadrático médio (mean_squared_error), mede o tempo de execução, e armazena os resultados em uma lista. Ao final, a função retorna essa lista de resultados, juntamente com o tempo total gasto na busca.

def busca_centralizada():
    resultados   = []
    tempo_total  = time.time()
    for n, d in hiperparametros:
        tempo_inicio = time.time()
        model = RandomForestRegressor(n_estimators=n, max_depth=d, random_state=42)
        model.fit(X_train, y_train)

        # Avaliar o modelo
        predictions = model.predict(X_test)
        mse         = mean_squared_error(y_test, predictions)
        tempo       = time.time() - tempo_inicio
        resultados.append((n, d, mse, tempo))

    return resultados , time.time() - tempo_total

Treinamento Distribuído

Agora, vamos implementar a mesma função de forma distribuída com o Ray. A função busca_distribuida() executa uma busca paralela por hiperparâmetros utilizando a biblioteca Ray para distribuir o treinamento do modelo de floresta aleatória (RandomForestRegressor). O código começa registrando o tempo total de execução. Em seguida, utiliza a função ray.get() para disparar tarefas paralelas com a função remota treino_random_forest.remote(n, d), que realiza o ajuste do modelo para diferentes combinações de hiperparâmetros (n_estimators e max_depth), os quais estão armazenados na variável hiperparametros. O processamento é distribuído e executado simultaneamente. A função retorna os resultados dessa busca paralela e o tempo total gasto na execução.

def busca_distribuida():
    tempo_total  = time.time()
    # Executar a busca de hiperparâmetros em paralelo
    resultados = ray.get([treino_random_forest.remote(n, d) for n, d in hiperparametros])
    return resultados, time.time() - tempo_total

Warning

Com o decorator na função treino_random_forest podemos utilizar o .remote() para executar a task nos workers disponíveis. Para isso, utilizamos um onliner para criar uma task para cada configuração.

Executando Treinamentos

resultados_centralizado, tempo_total_c = busca_centralizada()
resultados_distribuido, tempo_total_d  = busca_distribuida()

Comparando Abordagens

Após o treinamento dos modelos, vamos gerar uma visualização para comparar o desempenho em relação ao tempo da abordagem centralizada em relação a abordagem distribuída

import matplotlib.pyplot as plt
import seaborn

fig, ax = plt.subplots(1, 2, figsize=(15, 2))

ax[0].plot(np.cumsum(np.array(resultados_centralizado)[:,-1]), label='Centralizado', color='r', marker='o')
ax[0].plot(np.array(resultados_distribuido)[:,-1], label='Distribuído', color='b', marker='o')
ax[0].grid(True, linestyle=':')
ax[0].legend()
ax[0].set_xlabel('Configurações')
ax[0].set_ylabel('Tempo (s)')
ax[0].set_xticks([0, 1, 2, 3, 4], ('(100, 10)', '(200, 15)', '(300, 20)', '(400, 25)', '(500, 30)'))

ax[1].barh(['Cent.', 'Dist.'], [tempo_total_c, tempo_total_d], ec='k', color=['r', 'b'])
ax[1].set_ylabel('Abordagens')
ax[1].set_xlabel('Tempo Total (s)')

Distribuindo Processos em um Cluster Ray

Primeiramente, devemos criar um cluster ray iniciando o head informando o a porta que disponibilizará o serviço, ip que será compartilhado e outras opções como por exemplo a dashboard de visualização. Assim, podemos iniciar da seguinte forma:

ray start --head --node-ip-address="IP" --port=PORTA --dashboard-host=0.0.0.0 --dashboard-port=PORTA_DASHBOARD

Em seguida, os workers podem se conectar diretamente ao head especificando o ip e porta que desejam se conectar da seguinte forma:

ray start --address=IP:PORTA

Para executar sua task de forma distribuída nesse cluster, é preciso iniciar o ray no head node e executa-lo da mesma forma que o anterior. O próprio ray ficará responsável por distribuir o workload nos workers disponíveis

Tip

O Ray disponibiliza um dashboard para acompanhar a execução das task em um ambiente gráfico

Treinamento Distribuído - TensorFlow

O TensorFlow oferece diversas estratégias para o treinamento distribuído, adaptadas a diferentes arquiteturas e necessidades, permitindo que você utilize várias GPUs (ou até várias máquinas) para treinar modelos de aprendizado profundo. As principais estratégias são:

  • tf.distribute.MirroredStrategy: Para treinamento em uma única máquina com várias GPUs. Cria réplicas do modelo em cada GPU, onde as atualizações de gradiente são sincronizadas entre todas as GPUs no final de cada etapa de treinamento. Essa estratégia é eficiente quando se quer utilizar várias GPUs localmente.

  • tf.distribute.MultiWorkerMirroredStrategy: Para treinamento em várias máquinas (ou nós), cada uma com uma ou mais GPUs. Semelhante ao MirroredStrategy, mas pode escalar para várias máquinas. Utiliza um algoritmo chamado "All-Reduce" para combinar gradientes entre máquinas e GPUs.

  • tf.distribute.TPUStrategy: Para treinamento em TPUs (Tensor Processing Units), que são hardware especializado do Google para aprendizado profundo. Permite o uso de TPUs, distribuindo o treinamento entre os diferentes núcleos da TPU. TPUs são otimizadas para grandes cargas de trabalho e modelos maiores.

  • tf.distribute.ParameterServerStrategy: Para treinamento distribuído em larga escala. Distribui o treinamento em várias máquinas, onde algumas são designadas como servidores de parâmetros (armazenam e atualizam os pesos do modelo) e outras são "workers" (realizam o cálculo de gradientes). Esta abordagem é útil quando há muitos parâmetros para sincronizar entre as máquinas.

  • tf.distribute.CentralStorageStrategy: Em máquinas com múltiplas GPUs, onde os pesos do modelo são armazenados na CPU e os cálculos são feitos nas GPUs. Os gradientes são agregados na memória principal (CPU) ao invés de diretamente entre as GPUs, o que pode ser útil em casos com GPU com memória limitada.

Neste laboratório, focaremos em como usar o tf.distribute.MirroredStrategy, que permite treinamento distribuído em uma única máquina com várias GPUs.

Importando Bibliotecas

# Import TensorFlow and TensorFlow Datasets

import tensorflow_datasets as tfds
import tensorflow as tf
# tfds.disable_progress_bar()

import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Isso suprime avisos
import os

Importando Dados

Para esse exemplo vamos utiliza o Cifar10, portanto vamos utilizar o seguinte código para fazer o download e separar em treino e teste

# carrega o dataset que será utilizado nesse laboratório
datasets, info          = tfds.load(name='cifar100', with_info=True, as_supervised=True, data_dir='./data')
cifar_train, cifar_test = datasets['train'], datasets['test']

Definindo Estratégia

Após o download do dataset vamos definir a estratégia distribuída que será utilizada para o treinamento. Nesse laboratório vamos utilizar a estratégia tf.distribute.MirroredStrategy() para treinar o modelo de forma paralela em múltiplas GPUs porém utilizando batches diferentes

# Define a estratégia utilizada e mostra o números de GPUs disponíveis
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Number of devices: 2

Normalização e Batches

Em seguida, vamos fazer o processamento dos dados normalizando e separando em Batches. A função scale faz a normalização dos pixels da imagem

# função para normalizar as imagens
def scale(image, label):
    image  = tf.cast(image, tf.float32)
    image /= 255

    return image, label

A função get_data(distributed=True) configura e retorna os conjuntos de dados de treino e teste para um modelo de aprendizado de máquina, considerando se a execução será distribuída ou não. Primeiro, a função obtém o número de exemplos nos conjuntos de treino e teste a partir de uma variável info. Define um buffer de 10.000 para embaralhamento e um tamanho de lote (batch size) base de 64 por réplica. Se o processamento for distribuído (distributed=True), o tamanho do lote total é ajustado multiplicando o tamanho base pelo número de réplicas sincronizadas da estratégia distribuída (strategy.num_replicas_in_sync). Caso contrário, o tamanho do lote permanece 64. Os conjuntos de treino e teste são preparados com map(), aplicando uma função de escalonamento (scale), e são configurados com cache, embaralhamento e divisão em lotes. Finalmente, a função retorna esses conjuntos de dados processados (train_dataset e eval_dataset) para uso no treinamento e avaliação do modelo.

def get_data(distributed=True):
    # retorna o número de exemplos nos conjuntos de treino e teste 
    num_train_examples = info.splits['train'].num_examples
    num_test_examples  = info.splits['test'].num_examples

    BUFFER_SIZE = 10_000

    BATCH_SIZE_PER_REPLICA = 64
    # usado pela estratégia distribuída
    if distributed:
        BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
    else:
        BATCH_SIZE = BATCH_SIZE_PER_REPLICA * 1

    # configura os conjuntos de dados para treino e teste
    train_dataset = cifar_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
    eval_dataset  = cifar_test.map(scale).batch(BATCH_SIZE)

    return train_dataset, eval_dataset

Warning

Note que quando utilizamos a abordagem distribuída precisamos alterar o tamanho dos batches * a quantidade de GPUs disponíveis.

Definindo Modelo

A função create_model() define e retorna um modelo de rede neural convolucional (CNN) usando a API Sequential do TensorFlow/Keras, projetado para tarefas de classificação de imagens, como o dataset CIFAR-10. O modelo consiste em três camadas convolucionais (Conv2D) com 32 e 64 filtros, todas ativadas por ReLU, seguidas por camadas de pooling (MaxPooling2D) para redução dimensional. Após a extração de características, os dados são achatados (Flatten) e passados por uma camada totalmente conectada (Dense) com 64 neurônios. A última camada possui 100 neurônios com ativação softmax, para classificação de 100 categorias. O modelo é compilado com a função de perda SparseCategoricalCrossentropy (adequada para classificação com rótulos inteiros), o otimizador Adam, e a métrica de precisão. O modelo resultante está pronto para ser treinado em um problema de classificação.

def create_model():
    model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(100, activation='softmax')  # 10 classes no CIFAR-10
])
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

    return model

Treinamento Centralizado

Com o a função para criação do modelo definida, vamos iniciar o treinamento utilizando uma abordagem centralizada.

#define modelo único
model = create_model()

#inicia treinamento centralizado
train_dataset, eval_dataset = get_data(distributed=False)
hist_cent                   = model.fit(train_dataset, epochs=20)

Para apresentar o comportamento da abordagem centralizada, é apresentado a saída do comando nvidia-smi. Nele podemos ver que apenas uma GPU está sendo utilizada para realizar o treinamento do modelo em questão, ocupando 36% da GPU NVIDIA GeForce GTC 1080 Ti

Treinamento Distribuído

Dessa forma, para realizar o treinamento utilizando uma abordagem distribuída precisamos definir o modelo dentro do escopo da estratégia. Isso já é suficiente para informar o framework que queremos distribuir o treinamento entre as GPUs disponíveis.

with strategy.scope():
  # define o escopo que será aplicado a estratégia
  model = create_model()

#inicia treinamento paralelo
train_dataset, eval_dataset = get_data(distributed=True)
hist_dist                   =  model.fit(train_dataset, epochs=20)

Do mesmo modo, para apresentar o comportamento da abordagem distribuída, é apresentado a saída do comando nvidia-smi. Nele podemos ver ambas as GPUs estão sendo utilizada para realizar o treinamento do modelo em questão, ocupando 29% e 32% das GPUs NVIDIA GeForce GTC 1080 Ti disponíveis.

Warning

Para permitir o treinamento distribuído utilizando a estratégia definida, precisamos instanciar o modelo e também o .compile() dentro do escopo da estratégia

Atividade - Ray + MultiWorkerMirrorStrategy()

Agora é sua vez, com os exemplos apresentados nesse laboratório. Como podemos criar o treinamento de um modelo de deep learning em múltiplos nodes utilizando uma estratégia paralela?