Skip to content

Laboratório - Split Learning com gRPC

Nesse laboratório vamos explorar o uso de Split-Learning para o treinamento de modelos de forma distribuída. O Split Learning (também conhecido como SplitNN) é uma técnica de aprendizado profundo distribuído e preservar a privacidade, ideal para cenários onde múltiplas entidades possuem dados sensíveis e não deseham compartilhá-los diretamente. Desenvolvido por laboratório do MIT, essa técnica permite treinar redes neurais em múltiplas fontes de dados sem a necessidade de expor os dados brutos.

O Modelo é dividido em várias seções. Cada parte é treinada em um cliente diferente, garantindo que nenhuma entidade precise compartilhar seus dados sensíveis diretamente com outras ou com um servidor central, apenas os pesos de última camada (camada de corte) são transmitidos entre as entidades. A imagem a seguir ilustra alguns tipos de segmentação do modelo no contexto de Split Learning.

  • Formato Tadicional: Cliente possui um modelo base e o servidor possui o modelo head. Nesse formato, o cliente deve enviar os rótulos para o servidor, pois é no modelo head que a loss é calculada e onde o processo de backpropagation é iniciado
  • Formato U-Shaped: O cliente possui um modelo base e o modelo head, enquanto o servidor possui um modelo intermediário. Nesse formato, o cliente não precisa enviar os rótulos para o servidor, porém mais latência é introduzido no processo.
  • Shared Features: Cada cliente possui features diferente dos dados e um modelo base, assim as ativações devem ser concatenadas no servidor antes de continuar o pocesso de treinamento como modelo head.

Note

Em resumo o processo o split-learning funciona da seguinte forma:

  • Divisão do modelo: A rede é dividida em várias partes. Cada parte é treinada em uma entidade (cliente).
  • Camada de corte: O ponto de corte é definido onde as entidades trocam as saídas das camadas.
  • Propagação para o Servidor: As saídas da camada de corte são enviadas para o servidor, que completa a parte restante do treinamento.
  • Backpropagation: O servidor realiza o backpropagation até a camada de corte e envia os gradientes de volta aos clientes para ocmpletar o treinamento local.

Vantagens

  • Preservação da Privacidade: como não há troca de dados brutos, os participantes podem colaborar sem export suas informações confidenciais, como dados médicos ou financeiros.

  • Escalabilidade: Permite que dados de múltiplas entidades sejam usados, aumentando a diversidade e a quantidade de informação, o que é benéfico para a construção de modelos robustos.

  • Treinamento em Dados distribuídos: Entidades podem colaborar para treinar um modelo sem a necessidade de centralizar os dados, evitando questões de regulação ou confiança.

  • Combinação de modelos de Dados: Entidades com diferentes tipos de dados (ex: imagens, resultados laboratoriais) podem colaborar, como no caso de diagnósticos médicos.

Desvantagnes

  • Depêndencia da Rede: O tráfego de dados entre clientes e servido pode gerar latência, especialmente em redes como baixa largura de banda.

  • Complexidade de implementação: O Split learning requer mais coordenação entre as partes envolvidas, sendo mais complexo do que técnicas tradicionais.

  • Necessidade de Confiança no Servidor: O Servidor central ainda lida com partes do modelo e dos gradientes, o que pode levantar questões de segurança.

  • Rrisco de Ataque Inversos: Embora os dados brutos não sejam compartilhados, há riscos de que sob certas circunstâncias, informações sensíveis possam ser inferidas.

Fonte: - Split Learning vs Federated Learning and its Use Cases

Mais informações podem ser encontradas em: Split learning for health: Distributed deep learning without sharing raw patient data

Tutorial de Split Learning com gRPC

Definição do Protocolo .proto

Usamos o gRPC para definir a interface de comunicação entre o cliente e o servidor, e o arquivo .proto define os tipos de mensagens e os métodos de serviço.

Estrutura do Arquivo splitlearning.proto

  1. Definindo a Versão do Protocolo
syntax = "proto3";

package splitlearning;

message ClientToServer {
    repeated float activations = 1;
    repeated int32 labels      = 2;
    int32 batch_size           = 3;
    int32 client_id            = 4;
}

Esta mensagem representa os dados enviados do cliente para o servidor. Vamos analisar os campos:

  • repeated float activations = 1;: uma lista de valores de ativação. A saída de uma camada intermediária de uma rede neural que o cliente está treinando.
  • repeated int32 labels = 2;: uma lista de rótulos associados aos dados no lado do cliente (ou seja, a verdade de ground-truth dos exemplos).
  • int32 batch_size = 3;: o tamanho do batch (lote) de dados que o cliente está processando.
  • int32 client_id = 4;: o ID exclusivo do cliente que está enviando os dados.
message ServerToClient {
    repeated float gradients = 1;
    float loss               = 2;
    float acc                = 3;
}

Esta mensagem representa a resposta do servidor ao cliente. Vamos descrever seus campos:

  • repeated float gradients = 1;: uma lista de gradientes que o servidor calculou com base nas ativações recebidas do cliente. Esses gradientes serão usados pelo cliente para atualizar os pesos do modelo.
  • float loss = 2;: o valor da perda (loss) calculada no servidor. Isso informa ao cliente o quão bem o modelo está performando em termos de erro.
  • float acc = 3;: o valor da acurácia (acc) calculada no servidor. Isso informa ao cliente o quão bem o modelo está performando em termos de taxa de acerto.

Definindo o Serviço

Agora precisamos definir o serviço de comunicação que será utilizado entre cliente e servidor. Dessa forma, definimos o serviço SplitLearning, o qual envia uma mensagem do tipo ClientToServer e recebe uma mensagem do tipo ServerToClient. O serviço é definido pelo seguinte código:

service SplitLearning {
    rpc SendClientActivations(ClientToServer) returns (ServerToClient) {}
}

Compilando o Arquivo splitlearning.proto

Ao finalizar o arquivo splitlearning.proto, devemo compila-lo para gerar os código que serão responsáveis por realizar a comunicação e entre o cliente e o servidor através dos stubs. Assim, compilamos o arquivo .proto da seguinte forma:

python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. splitlearning.proto

Resumo

O arquivo splitlearning.proto define as mensagens e os métodos necessários para o aprendizado federado com split learning usando gRPC. As mensagens cobrem as ativações, gradientes, perdas e informações de status do treinamento. O serviço SplitLearning, por sua vez, define as operações principais: envio de ativações e notificação de conclusão do treinamento.

Cliente

Vamos implementar o código do cliente, que terá os dados a serem treinados e a parte inicial do modelo. Dessa forma, o cliente deve passar os dados por esse modelo parcial (i.e., feedforward) e enviar as ativações para o servidor junto com o labels para finalizar o processo de feedforward. Em seguida, o sevidor deve calcular a loss e gradientes e iniciar o backpropagation em sua parte do modelo e enviar os gradiente para o cliente para que ele possa finalizar o backpropagation em seu modelo parcial.

Importações Necessárias

import splitlearning_pb2 as pb2
import splitlearning_pb2_grpc as pb2_grpc
import tensorflow as tf
import keras
from keras.models import Model
import grpc
import time
import numpy as np
import os

Criando o Modelo Parcial

O código define a função create_partial_model, que cria um modelo parcial. A função recebe uma camada de entrada (input_layer) e aplica uma sequência de camadas: a primeira é uma convolução 2D (Conv2D) com 32 filtros, kernel 3x3 e ativação ReLU, seguida por uma camada de max pooling 2D (MaxPooling2D) com janela 2x2 para reduzir a dimensionalidade. Em seguida, é aplicada outra camada de convolução 2D com 64 filtros, kernel 3x3 e ativação ReLU, e, por fim, uma camada densa (Dense) com 128 unidades e ativação ReLU. A função retorna um modelo que utiliza a camada de entrada e tem como saída a última camada densa (layer4), sendo um modelo parcial, podendo ser expandido posteriormente.

def create_partial_model(input_layer):
    layer1 = keras.layers.Conv2D(32, (3, 3), activation='relu')(input_layer)
    layer2 = keras.layers.MaxPooling2D((2, 2))(layer1)
    layer3 = keras.layers.Conv2D(64, (3, 3), activation='relu')(layer2)
    layer4 = keras.layers.Dense(128, activation='relu')(layer3)
    return Model(inputs=input_layer, outputs=layer4)

Função para Obter Ativações do Modelo

O código define a função get_activations, que obtém as ativações de um modelo usando TensorFlow. A função recebe o modelo (model) e um conjunto de dados de entrada (X). Utilizando o tf.GradientTape com a opção de persistência habilitada, as ativações do modelo são calculadas passando os dados de entrada (X) para o modelo. A função retorna as ativações resultantes e o objeto tape, que é utilizado posteriormente para calcular gradientes.

def get_activations(model, X):
    with tf.GradientTape(persistent=True) as tape:
        activations = model(X)
    return activations, tape

Enviando Ativações ao Servidor

O código define a função send_activations_to_server, que envia as ativações de um modelo, junto com os rótulos, para um servidor utilizando gRPC. A função recebe como parâmetros o stub para comunicação com o servidor (stub), as ativações do modelo (activations), os rótulos (labels), o tamanho do lote (batch_size) e o identificador do cliente (client_id). As ativações são convertidas para um array NumPy e achatadas. Em seguida, é criado um objeto de mensagem ClientToServer da classe pb2.ClientToServer, onde as ativações e os rótulos são adicionados à mensagem, junto com o tamanho do lote e o identificador do cliente. A mensagem é enviada ao servidor através do método SendClientActivations do stub, e a função retorna a resposta recebida do servidor.

def send_activations_to_server(stub, activations, labels, batch_size, client_id):
    activations_list = activations.numpy().flatten()

    client_to_server_msg = pb2.ClientToServer()
    client_to_server_msg.activations.extend(activations_list)
    client_to_server_msg.labels.extend(labels.flatten())
    client_to_server_msg.batch_size = batch_size
    client_to_server_msg.client_id = client_id

    server_response = stub.SendClientActivations(client_to_server_msg)
    return server_response

Carregando o Dataset CIFAR-10

Agora vamos definir os dados que serão utilizados para realizar o treinamento do modelo utilizando o Split-Learning. Para esse exemplo, vamos utilizar o CIFAR10 com o seguinte código:

# CIFAR-10 dataset
cifar10                              = keras.datasets.cifar10
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
X_train, X_test                      = X_train / 255.0, X_test / 255.0 

Criando o Modelo Parcial e Configurando o Otimizador

Após carregar os dados, vamos definir o modelo parcial que será utilizado pelo cliente e também o otimizador que será responsável por atualizar os pessos do modelo de acordo com os gradientes recebidos do servidor.

partial_model    = create_partial_model(keras.layers.Input(shape=(32, 32, 3)))
client_optimizer = tf.keras.optimizers.Adam()

Configuração da Conexão gRPC

Precisamos definir alguns parâmetros para o gRPC para que seja possível enviar os dados para o servidor (para lidar com grandes mensagens, como gradientes), assim, alteramos os valores máximos das mensagens enviadas e recebidas pelo canal de comunicaão estabelecido com o servidor.

MAX_MESSAGE_LENGTH = 20 * 1024 * 1024 * 10
channel = grpc.insecure_channel('localhost:50051', options=[
    ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
    ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH),
])
stub = pb2_grpc.SplitLearningStub(channel)

Função train_step

Esse código define a função train_step, que realiza uma etapa de treinamento para um modelo de aprendizado de máquina utilizando um processo de comunicação cliente-servidor. Aqui está uma descrição resumida do que cada parte faz:

  • Coleta de ativações: A função get_activations é chamada para calcular as ativações do modelo usando o lote de entrada (x_batch), juntamente com o tape, que será usado para calcular os gradientes posteriormente. As ativações são achatadas para facilitar o envio ao servidor.

  • Envio e resposta do servidor: As ativações achatadas são enviadas ao servidor, juntamente com os rótulos do lote (y_batch), e o tempo de latência do envio é medido. O servidor responde com os gradientes das ativações, perda e acurácia do modelo, que são então convertidos e reformatados para tensores compatíveis.

  • Cálculo do gradiente no cliente: Utilizando o tape, os gradientes com relação às variáveis treináveis do modelo são calculados a partir dos gradientes das ativações recebidos do servidor.

  • Atualização do modelo: O otimizador aplica os gradientes calculados para atualizar os pesos do modelo.

def train_step(model, x_batch, y_batch, batch, optimizer, epoch, stub):

    activations, tape     = get_activations(model, x_batch)
    flattened_activations = tf.reshape(activations, (activations.shape[0], -1))

    latencia_start  = time.time()
    server_response = send_activations_to_server(stub, flattened_activations, y_batch, len(x_batch), 1)
    latencia_end    = time.time()

    print("Received response from server")
    activations_grad = tf.convert_to_tensor(server_response.gradients, dtype=tf.float32)
    activations_grad = tf.reshape(activations_grad, activations.shape)

    client_gradient = tape.gradient(
        activations,
        model.trainable_variables,
        output_gradients=activations_grad
    )

    bytes_tx  = flattened_activations.numpy().nbytes
    bytes_rx  = activations_grad.numpy().nbytes
    latencia  = latencia_end - latencia_start
    loss      = server_response.loss
    acc       = server_response.acc

    print(f"Latencia: {latencia} segundos")
    print(f"Data Tx: {bytes_tx / 2**20} MB")
    print(f"Data Rx: {bytes_rx / 2**20} MB")

    optimizer.apply_gradients(zip(client_gradient, model.trainable_variables))

    with open('results.csv', 'a') as f:
        f.write(f"{epoch}, {batch}, {loss}, {acc}, {latencia}, {bytes_tx / 2**20}, {bytes_rx / 2**20}\n")

Note

O código apresentado também coleta métricas para avaliação, no código são medidos e exibidos a latência, a quantidade de dados enviados e recebidos. Esses valores, junto com a loss e a acurácia, são registrados em um arquivo CSV (results.csv) para análise posterior.

Realizando o treinamento

Este código realiza um treinamento em várias épocas e lotes (batches) para um modelo de aprendizado de máquina. Ele executa 10 épocas de treinamento através do laço for epoch in range(10). Em cada época, define o tamanho do batch como 64 e calcula o número total de batches por época com base no tamanho do conjunto de dados de treinamento (X_train) e o tamanho do lote. Para cada lote, o código imprime o progresso atual (Epoch {epoch} - Batch {batch}/{n_batches}) e, em seguida, extrai um lote de dados (X_batch) e rótulos correspondentes (y_batch) de X_train e y_train, com base no índice do lote atual. Em cada iteração, a função train_step é chamada, passando o modelo parcial (partial_model), o lote atual de dados e rótulos, o índice do lote, o otimizador do cliente (client_optimizer), a época atual e o stub (para comunicação com o servidor). Em resumo, o código realiza o treinamento por lotes em múltiplas épocas, passando cada lote para uma função de treinamento específica que atualiza o modelo.

for epoch in range(10):
    batch_size = 64
    n_batches  = X_train.shape[0]//batch_size

    for batch in range(n_batches):
        print(f"Epoch {epoch} - Batch {batch}/{n_batches}")

        X_batch  = X_train[batch_size * batch : batch_size * (batch+1)]
        y_batch  = y_train[batch_size * batch : batch_size * (batch+1)]

        train_step(partial_model, X_batch, y_batch, batch, client_optimizer, epoch, stub)

Resumo

Implementamos um cliente para split learning usando TensorFlow e gRPC. O cliente treina um modelo parcial e envia as ativações ao servidor, que calcula os gradientes e os retorna. O cliente então aplica esses gradientes para continuar o treinamento. Essa abordagem permite que o cliente mantenha parte dos dados localmente, melhorando a privacidade e a segurança no treinamento distribuído.

Servidor

Agora vamos definir o código do servidor, que será responsável por receber as ativações do cliente, realizar o restante do processo de Feed Forward, calcular a perda (loss), os gradientes e a métrica de avalização do modelo acurácia. Em seguida, o servidor vai iniciar o processo de Backpropagation e enviar os gradiente para o cliente para finalizar o processo.

Importações Necessárias

import splitlearning_pb2 as pb2
import splitlearning_pb2_grpc as pb2_grpc
import grpc
from concurrent import futures
import tensorflow as tf
import keras
from keras.models import Model
import time
import pandas as pd
import os

Definição do Modelo no Servidor

O código define a classe ServerModel, define o restante do modelo no servidor. No método de inicialização (__init__), duas camadas densas são criadas: a primeira, dense1, é uma camada intermediária com 64 unidades e ativação ReLU, enquanto a segunda, dense2, é a camada de saída com 10 unidades e ativação softmax, apropriada para um problema de classificação com 10 classes (como CIFAR-10). O método call é responsável por definir o fluxo dos dados pelo modelo, onde os dados de entrada são primeiro processados pela camada dense1 e depois pela camada dense2, retornando a previsão final do modelo.

class ServerModel(tf.keras.models.Model):
    def __init__(self):
        super(ServerModel, self).__init__()
        self.dense1 = keras.layers.Dense(64, activation='relu')     # Camada intermediária
        self.dense2 = keras.layers.Dense(10, activation='softmax')  # Camada de saída 

    def call(self, input):
        x = self.dense1(input)
        x = self.dense2(x)
        return x

Inicializando o Serviço

Podemos então implementar o serviço que irá receber as mensagens do cliente. Para isso precisamos inicia-lo e também definir o modelo, otimizador e métricas que serâo utilizados pelo servidor.

class SplitLearningService(pb2_grpc.SplitLearningServicer):

    def __init__(self):
        self.server_model = create_server_model((128,))
        self.optimizer    = tf.keras.optimizers.Adam()
        self.metrics      = tf.keras.metrics.SparseCategoricalAccuracy()

Recebendo Ativações do Cliente

O método recebe uma solicitação do cliente, onde as ativações são convertidas em tensores e reformuladas de acordo com o tamanho do batch. As etiquetas (rótulos) também são convertidas para tensores. Utilizando um tf.GradientTape persistente, o servidor realiza uma passagem direta pelo modelo, calcula as previsões e a perda usando a entropia cruzada categórica esparsa, e mede a acurácia. Em seguida, os gradientes da perda com relação às variáveis treináveis do modelo do servidor são calculados e aplicados pelo otimizador Adam. O gradiente das ativações também é calculado para ser enviado de volta ao cliente. A resposta para o cliente contém esses gradientes, além do valor da perda e da acurácia. A cada chamada, o número de épocas (epoch) é incrementado, e as métricas de desempenho (perda e acurácia) são exibidas no console antes da resposta ser enviada.

def SendClientActivations(self, request, context):
    activations = tf.convert_to_tensor(request.activations, dtype=tf.float32)
    # -1 faz com que a dimensao seja calculada automaticamente
    activations = tf.reshape(activations, (request.batch_size, -1)) 
    labels      = tf.convert_to_tensor(request.labels, dtype=tf.float32)
    print(activations.shape)

    global epoch

    with tf.GradientTape(persistent=True) as tape:
        tape.watch(activations)
        predictions = self.server_model(activations, training=True)
        loss        = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
        loss        = tf.reduce_mean(loss)
        acc         = self.metrics(labels, predictions)

    print("BACKWARD")
    server_gradients = tape.gradient(loss, self.server_model.trainable_variables)
    self.optimizer.apply_gradients(zip(server_gradients, self.server_model.trainable_variables))

    activations_gradients = tape.gradient(loss, activations)
    response              = pb2.ServerToClient()

    response.gradients.extend(activations_gradients.numpy().flatten())
    response.loss = loss.numpy()
    response.acc  = acc.numpy()
    epoch        += 1

    print(f"Epoch {epoch} - Loss: {loss.numpy()} - Acc: {acc.numpy()}")
    return response

Resumo

Este método é responsável por:

  • Receber as ativações e rótulos do cliente via a mensagem ClientToServer.
  • Usar GradientTape para calcular os gradientes em relação às ativações e às variáveis treináveis do modelo.
  • Aplicar os gradientes ao modelo do servidor.
  • Enviar de volta os gradientes das ativações para o cliente, juntamente com o valor da perda calculada.

Iniciando o Servidor gRPC

O código define a inicialização de um servidor gRPC global (server), com um limite máximo de mensagem configurado para 200 MB tanto para o envio quanto para o recebimento de dados (MAX_MESSAGE_LENGTH). O servidor é criado usando um ThreadPoolExecutor com 10 threads. Em seguida, o serviço SplitLearningService é adicionado ao servidor por meio da função pb2_grpc.add_SplitLearningServicer_to_server(). O servidor é configurado para ouvir em uma porta insegura (50051), o que significa que não utiliza SSL/TLS para a comunicação. Após a inicialização, uma mensagem é exibida no console informando que o servidor foi iniciado na porta 50051, e ele entra em modo de espera contínuo com a função server.wait_for_termination(), aguardando requisições de clientes.

def serve():
    global server
    MAX_MESSAGE_LENGTH = 20 * 1024 * 1024 * 10
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
            ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH),
        ])
    pb2_grpc.add_SplitLearningServicer_to_server(SplitLearningService(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

Warning

Para realizar o treinamento do modelo, devemos primeiro iniciar o código do servidor python server.py e em seguinda o código do cliente com python client.py

Visualizando os Resulados

Com as métricas coletadas pelo cliente e salvas no arquivo results.csv, podemos gerar visualizações para analisar o comportamento do modelo e também quantidade bytes enviados e latência por batch. O código a seguir apresenta um exemplo de visualizações que podem ser geradas:

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

df = pd.read_csv('results.csv', names=['epoch', 'batch', 'loss', 'accuracy', 'latencia', 'tx', 'rx'])

fig, ax = plt.subplots(2, 2, figsize=(15, 7.5))
ax      = ax.flatten()

sns.lineplot(x=df.index, y=df.loss, color='b', ax=ax[0], linewidth=2)
sns.lineplot(x=df.index, y=df.accuracy, color='k', ax=ax[1], linewidth=2)
sns.lineplot(x=df.index, y=df.latencia, color='r', ax=ax[2], linewidth=2)
sns.lineplot(x=df.index, y=df.tx.cumsum(), color='orange', ax=ax[3], linewidth=2)

for _ in range(4):
    ax[_].set_xlabel('Epochs (#)', size=13)
    ax[_].grid(True, linestyle=':')
    ax[_].set_xticks((range(0, len(df), 781)), ('1', '2', '3', '4', '5', ' 6'))

ax[0].set_ylabel('Loss', size=13)
ax[1].set_ylabel('Accurácia (%)', size=13)
ax[2].set_ylabel('Latência (s)', size=13)
ax[3].set_ylabel('Bytes Transmited (MB)', size=13)
ax[3].set_yscale('log') 

Exercício - (Entregável)

De acordo os o laboratório apresentado, altere o código para que funcione com múltiplos clientes e um único servidor. você pode utilizar os conhecimentos apresentado no Laboratório 7 sobre o Ray para instânciar os clintes. Dessa forma, faça as seguintes tarefas:

  • Permita a execução de múltiplos cleintes
  • Faça a separação de dados entre os clientes, de modo que cada cliente receba porções diferentes de dados
  • Implemente um treinamento de Split-Learning com formato U-Shaped

Note

No formato U-Shaped os cliente possuem os dados e os rótulos, e o servidor possui apenas camadas intermediárias do modelo. Assim, considere três modelos para esse treinamento (M1, M2 e M3), que deve ser feito da seguinte forma:

  • Feed Forward: cliente envia as ativações do modelo M1 para o servidor, o servidor continua o processo de Feed Forward e envia as ativações resultantes do modelo M2 para o cliente, cliente recebe as ativações calcula a loss e inicia backpropagation com o modelo M3
  • Backpropagation: cliente inicia o backpropagation, atualiza os pesos da última camada do modelo M3 baseado na loss e envia os gradientes para o servidor, em seguida o servidor recebe os gradientes e atualiza os pesos do modelo M2, calcula novos gradientes e envia os gradientes resultantes para o cliente. Por fim, o cliente recebe os gradientes do servidor e finaliza o processo de backpropagation atualizando os pesos do modelo M1.