Skip to content

SplitFed com clusterização dinâmica em cenário veiculares

Este projeto implementa o esboço de uma abordagem de aprendizado utilizando a técnica de SplitFed Learning com clustering dinâmico. O objetivo é dividir a carga de processamento entre as estações bases, que utilizam RSUs (Unidades de Comunicação de Estrada) para coleta dos dados, que agem como clientes e um servidor central, utilizando uma comunicação eficiente e distribuída. na qual os clientes são organizados em clusters com base na métrica de total_flown. A cada rodada, cada RSU (cliente) é atribuída a um cluster (alto, medio, ou baixo) e ajusta seu tamanho de lote de acordo com o cluster selecionado. A seguir, detalhamos o funcionamento do código e a lógica por trás do processamento de ativações e gradientes.

Clusterização

Descrição Geral

Estrutura do Código

  • Clientes (RSUs): Cada RSU treina uma parte do modelo (client_model) localmente e envia as ativações para o servidor.
  • Servidor: O servidor recebe as ativações, realiza seu próprio processamento, calcula as previsões e a perda, e então calcula e envia gradientes de volta ao cliente para atualização do modelo.
  • Agregação Federada por Cluster: Ao final de cada rodada, os modelos de cada cluster são agregados, e então uma média entre os clusters é feita para atualizar o modelo global.

Visão Geral do Processo:

1. Configuração Inicial: Os dados de várias RSUs são carregados. Cada cliente ajusta o número de camadas do modelo e o batch size com base no valor da métrica total_flown. A lógica é:

- Alto fluxo de tráfego: batch_size = 128.
- Médio fluxo de tráfego: batch_size = 64.
- Baixo fluxo de tráfego: batch_size = 32.

2. Criação dos Modelos Cliente e Servidor: Cada cliente cria um modelo (client_model) com o número de camadas especificado. O servidor possui seu próprio modelo (server_model), que processa as ativações dos clientes e retorna os gradientes correspondentes.

3. Treinamento em SplitFed: - Em cada época de treinamento, cada cliente processa uma parte dos dados (batch) e gera ativações. - O servidor recebe essas ativações, realiza o processamento adicional para fazer previsões e calcular a perda. - O servidor calcula os gradientes de sua própria parte do modelo e aplica esses gradientes para atualizar seus pesos. - O servidor então calcula os gradientes das ativações para retornar aos clientes, que os utilizam para atualizar os pesos de seus respectivos modelos.

Descrição comparativa entre os tipos de aprendizado

4. Agregação Federada por Cluster:

No final de cada rodada, os modelos dos clientes em cada cluster são agregados. Os pesos são combinados, resultando em um conjunto de pesos representativos para cada cluster. Uma média final é calculada entre os clusters para atualizar o modelo global.

5. Atualização do Modelo Global: O modelo global é atualizado com os pesos agregados e salvo ao final de todas as rodadas.

Lógica das Ativações e Gradientes:

A divisão do treinamento ocorre da seguinte forma:

  1. Processamento no Cliente:

    • O cliente utiliza tf.GradientTape para capturar as ativações geradas por seu modelo (client_model).
    • A saída do cliente (vehicle_output_train) é enviada ao servidor como uma ativação.
  2. Processamento no Servidor:

    • O servidor utiliza tf.GradientTape (com persistent=True para reutilizar o mesmo tape) para calcular a previsão final (y_pred) e a perda (loss) com base nas ativações recebidas do cliente.
    • Os gradientes do erro em relação aos pesos do modelo do servidor são calculados e aplicados diretamente ao servidor.
  3. Retorno dos Gradientes para o Cliente:

    • O servidor calcula os gradientes das ativações recebidas com relação à perda. Esses gradientes são chamados de client_act_grads.
    • O cliente recebe os client_act_grads e calcula os gradientes dos pesos de seu modelo (client_grads), aplicando-os para atualizar os pesos do cliente.

Esse fluxo de ativação-gradiente permite que apenas uma parte do modelo seja treinada em cada lado (cliente e servidor), reduzindo a sobrecarga de comunicação e preservando a privacidade dos dados.

Fluxo de Comunicação

  1. Recepção de Forward: O servidor recebe as ativações e rótulos dos clientes, calcula os gradientes e os envia de volta para o cliente.
  2. Agregação de Pesos: O servidor recebe os pesos dos clientes e os combina no modelo global (fazendo uma média simples ).
  3. Envio de Pesos Atualizados: O servidor envia os pesos atualizados de volta para os clientes para que eles possam continuar o treinamento local.

Vamos codar isso tudo então!

Lógica do Código principal do Cliente:

A função train_client é responsável pelo treinamento local do modelo em cada cliente (RSU) durante o processo de Split Federated Learning. Ela realiza o treinamento em múltiplas épocas, processa as ativações, comunica-se com o servidor para forward e backward propagation, aplica gradientes e atualiza os pesos globais do modelo. - client_model: O modelo neural que será treinado localmente no cliente (RSU). - X_train: Dados de entrada de treinamento para o cliente. - y_train: Labels correspondentes aos dados de entrada. - rsu_id: Identificador da RSU (cliente). - batch_size: Tamanho do lote de dados para treinamento. - stub: Objeto de comunicação gRPC que permite enviar e receber mensagens do servidor.

def train_client(client_model, X_train, y_train,rsu_id, batch_size, stub):
    client_optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)  # Novo otimizador para cada cliente

    for epoch in range(10):  # Loop de épocas  
        print(f"\tÉpoca {epoch + 1} para RSU {rsu_id}")

        epoch_loss = []

        for i in range(0, len(X_train), batch_size):
            X_batch = X_train[i:i + batch_size]
            y_batch = y_train[i:i + batch_size]

            # Processamento no cliente # 1
            with tf.GradientTape() as client_tape: 
                vehicle_output_train = client_model(X_batch)

            vehicle_output_train = create_activations(vehicle_output_train)

            forward_msg = create_forward_message(vehicle_output_train,y_batch,rsu_id,epoch,i) #2
            # Enviar a mensagem para o servidor
            response = stub.Forward(forward_msg)

            # Lendo as mensagens do servidor para o Backpropagation
            client_act_grads = read_backward_message(response)

            client_grads = client_tape.gradient(vehicle_output_train, #3
                                                client_model.trainable_variables, 
                                                output_gradients=client_act_grads['gradients'])

            client_optimizer.apply_gradients(zip(client_grads, client_model.trainable_variables)) #4

            # Enviando os pesos do modelo para o servidor
            client_weights = client_model.get_weights()
            response = stub.Aggregate(create_aggregate_message(client_weights,batch_size))

            global_weights = read_model(response)

            client_model.set_weights(global_weights)

    return client_model
1. Cálculo das Ativações: Usando GradientTape, o cliente realiza a propagação para frente (forward propagation), gerando as ativações vehicle_output_train para o lote de entrada atual. 2. Criação das Ativações: create_activations processa as ativações, aplicando possíveis transformações para preparar os dados a serem enviados ao servidor. - Mensagem de Forward: create_forward_message cria uma mensagem contendo as ativações, labels, ID da RSU, época e índice do lote. - Envio ao Servidor: A mensagem é enviada ao servidor para continuidade da propagação para frente. 3. Gradientes Locais: Usando o GradientTape, o cliente calcula os gradientes dos pesos locais do modelo, com base nos gradientes das ativações recebidos do servidor. 4. Atualização dos Pesos: Os gradientes calculados são aplicados aos pesos do modelo local usando o otimizador Adam. Isso ajusta o modelo local para minimizar o erro de predição.

A função train_client implementa o treinamento local de um cliente (RSU) em um cenário de Split Federated Learning, onde o processamento é dividido entre cliente e servidor. O cliente realiza a forward propagation localmente, envia as ativações para o servidor, recebe gradientes para backpropagation e atualiza seus pesos antes de enviar os novos pesos para agregação no servidor. Esse processo permite que o treinamento ocorra de forma distribuída e colaborativa, preservando a privacidade dos dados.

Logica do codigo do servidor

O método Forward faz parte da classe que define o servidor no contexto de Split Federated Learning (SplitFed Learning). Esse método recebe as ativações dos clientes (RSUs), realiza a predição e o cálculo da perda no servidor, aplica os gradientes nos pesos do modelo global e calcula os gradientes para o backpropagation que serão enviados de volta aos clientes.

   def Forward(self, request, context):
        response = read_forward_message(request)
        activations = response['activations']
        labels = response['labels']

        with tf.GradientTape() as tape:
            tape.watch(activations)
            predictions = self.global_model(activations, training=True)
            loss = tf.reduce_mean(tf.square(predictions - labels))  # Calcula o MSE

        # Gradientes e aplicação no servidor
        server_grads = tape.gradient(loss, self.global_model.trainable_variables)
        self.global_optimizer.apply_gradients(zip(server_grads, self.global_model.trainable_variables))

        # Gradientes e aplicação no cliente
        client_act_grads = tape.gradient(loss, activations)
        client_act_grads = create_backward_message(client_act_grads,loss)

        # Retorna os gradientes ao cliente
        return client_act_grads
- read_forward_message(request): Converte a mensagem de ativação recebida do cliente para um dicionário(activations,shapes,labels,client_id,rnd,batch) - Gradientes: Calcula os gradientes do modelo global com base na perda (MSE). - Retorno para o Cliente: Os gradientes das ativações são enviados de volta para o cliente, permitindo a atualização do modelo local. O método Forward no servidor realiza as seguintes etapas:

  1. Recebe as ativações e rótulos do cliente.
  2. Realiza a predição usando o modelo global do servidor.
  3. Calcula a perda utilizando o erro quadrático médio (MSE).
  4. Calcula os gradientes e aplica a atualização dos pesos no modelo global do servidor.
  5. Calcula os gradientes em relação às ativações do cliente e envia esses gradientes de volta para o cliente, permitindo o ajuste local dos pesos.

Problemas

O erro que estou enfrentando é um problema de tipagem durante a comunicação entre o cliente e o servidor . Ele ocorre ao tentar enviar dados da forward propagation (ativações) para o servidor usando gRPC. O traceback indica que o tipo de dados enviado é um ndarray do NumPy, enquanto o servidor espera um objeto do tipo tf.Tensor, tf.Variable, ou ExtensionType.

Conclusão

A proposta desse processo é realizar o treinamento federado com clusters, onde cada RSU treina um modelo local e, ao final de cada rodada, os modelos são agregados para formar um modelo global. O processo é repetido para múltiplas rodadas, e o modelo global resultante é salvo e pode ser utilizado para fazer previsões em novos dados.

O uso de clusters permite que modelos com características semelhantes sejam agrupados, o que pode melhorar a eficiência da agregação e, potencialmente, a performance do modelo global.

A integração de gRPC oferece uma solução robusta e escalável para comunicação e orquestração no contexto de aprendizado federado distribuído, permitindo a execução de tarefas em múltiplos nós de maneira eficiente e paralela.

O código completo estás disponível no Github para reprodução, assim como a base de dados