SplitFed com clusterização dinâmica em cenário veiculares
Este projeto implementa o esboço de uma abordagem de aprendizado utilizando a técnica de SplitFed Learning com clustering dinâmico. O objetivo é dividir a carga de processamento entre as estações bases, que utilizam RSUs (Unidades de Comunicação de Estrada) para coleta dos dados, que agem como clientes e um servidor central, utilizando uma comunicação eficiente e distribuída. na qual os clientes são organizados em clusters com base na métrica de total_flown. A cada rodada, cada RSU (cliente) é atribuída a um cluster (alto, medio, ou baixo) e ajusta seu tamanho de lote de acordo com o cluster selecionado. A seguir, detalhamos o funcionamento do código e a lógica por trás do processamento de ativações e gradientes.
Descrição Geral
Estrutura do Código
- Clientes (RSUs): Cada RSU treina uma parte do modelo (
client_model) localmente e envia as ativações para o servidor. - Servidor: O servidor recebe as ativações, realiza seu próprio processamento, calcula as previsões e a perda, e então calcula e envia gradientes de volta ao cliente para atualização do modelo.
- Agregação Federada por Cluster: Ao final de cada rodada, os modelos de cada cluster são agregados, e então uma média entre os clusters é feita para atualizar o modelo global.
Visão Geral do Processo:
1. Configuração Inicial: Os dados de várias RSUs são carregados. Cada cliente ajusta o número de camadas do modelo e o batch size com base no valor da métrica total_flown. A lógica é:
- Alto fluxo de tráfego: batch_size = 128.
- Médio fluxo de tráfego: batch_size = 64.
- Baixo fluxo de tráfego: batch_size = 32.
2. Criação dos Modelos Cliente e Servidor: Cada cliente cria um modelo (client_model) com o número de camadas especificado. O servidor possui seu próprio modelo (server_model), que processa as ativações dos clientes e retorna os gradientes correspondentes.
3. Treinamento em SplitFed: - Em cada época de treinamento, cada cliente processa uma parte dos dados (batch) e gera ativações. - O servidor recebe essas ativações, realiza o processamento adicional para fazer previsões e calcular a perda. - O servidor calcula os gradientes de sua própria parte do modelo e aplica esses gradientes para atualizar seus pesos. - O servidor então calcula os gradientes das ativações para retornar aos clientes, que os utilizam para atualizar os pesos de seus respectivos modelos.
4. Agregação Federada por Cluster:
No final de cada rodada, os modelos dos clientes em cada cluster são agregados. Os pesos são combinados, resultando em um conjunto de pesos representativos para cada cluster. Uma média final é calculada entre os clusters para atualizar o modelo global.
5. Atualização do Modelo Global: O modelo global é atualizado com os pesos agregados e salvo ao final de todas as rodadas.
Lógica das Ativações e Gradientes:
A divisão do treinamento ocorre da seguinte forma:
-
Processamento no Cliente:
- O cliente utiliza tf.GradientTape para capturar as ativações geradas por seu modelo (client_model).
- A saída do cliente (vehicle_output_train) é enviada ao servidor como uma ativação.
-
Processamento no Servidor:
- O servidor utiliza tf.GradientTape (com persistent=True para reutilizar o mesmo tape) para calcular a previsão final (y_pred) e a perda (loss) com base nas ativações recebidas do cliente.
- Os gradientes do erro em relação aos pesos do modelo do servidor são calculados e aplicados diretamente ao servidor.
-
Retorno dos Gradientes para o Cliente:
- O servidor calcula os gradientes das ativações recebidas com relação à perda. Esses gradientes são chamados de client_act_grads.
- O cliente recebe os client_act_grads e calcula os gradientes dos pesos de seu modelo (client_grads), aplicando-os para atualizar os pesos do cliente.
Esse fluxo de ativação-gradiente permite que apenas uma parte do modelo seja treinada em cada lado (cliente e servidor), reduzindo a sobrecarga de comunicação e preservando a privacidade dos dados.
Fluxo de Comunicação
- Recepção de Forward: O servidor recebe as ativações e rótulos dos clientes, calcula os gradientes e os envia de volta para o cliente.
- Agregação de Pesos: O servidor recebe os pesos dos clientes e os combina no modelo global (fazendo uma média simples ).
- Envio de Pesos Atualizados: O servidor envia os pesos atualizados de volta para os clientes para que eles possam continuar o treinamento local.
Vamos codar isso tudo então!
Lógica do Código principal do Cliente:
A função train_client é responsável pelo treinamento local do modelo em cada cliente (RSU) durante o processo de Split Federated Learning. Ela realiza o treinamento em múltiplas épocas, processa as ativações, comunica-se com o servidor para forward e backward propagation, aplica gradientes e atualiza os pesos globais do modelo. - client_model: O modelo neural que será treinado localmente no cliente (RSU). - X_train: Dados de entrada de treinamento para o cliente. - y_train: Labels correspondentes aos dados de entrada. - rsu_id: Identificador da RSU (cliente). - batch_size: Tamanho do lote de dados para treinamento. - stub: Objeto de comunicação gRPC que permite enviar e receber mensagens do servidor.
def train_client(client_model, X_train, y_train,rsu_id, batch_size, stub):
client_optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) # Novo otimizador para cada cliente
for epoch in range(10): # Loop de épocas
print(f"\tÉpoca {epoch + 1} para RSU {rsu_id}")
epoch_loss = []
for i in range(0, len(X_train), batch_size):
X_batch = X_train[i:i + batch_size]
y_batch = y_train[i:i + batch_size]
# Processamento no cliente # 1
with tf.GradientTape() as client_tape:
vehicle_output_train = client_model(X_batch)
vehicle_output_train = create_activations(vehicle_output_train)
forward_msg = create_forward_message(vehicle_output_train,y_batch,rsu_id,epoch,i) #2
# Enviar a mensagem para o servidor
response = stub.Forward(forward_msg)
# Lendo as mensagens do servidor para o Backpropagation
client_act_grads = read_backward_message(response)
client_grads = client_tape.gradient(vehicle_output_train, #3
client_model.trainable_variables,
output_gradients=client_act_grads['gradients'])
client_optimizer.apply_gradients(zip(client_grads, client_model.trainable_variables)) #4
# Enviando os pesos do modelo para o servidor
client_weights = client_model.get_weights()
response = stub.Aggregate(create_aggregate_message(client_weights,batch_size))
global_weights = read_model(response)
client_model.set_weights(global_weights)
return client_model
A função train_client implementa o treinamento local de um cliente (RSU) em um cenário de Split Federated Learning, onde o processamento é dividido entre cliente e servidor. O cliente realiza a forward propagation localmente, envia as ativações para o servidor, recebe gradientes para backpropagation e atualiza seus pesos antes de enviar os novos pesos para agregação no servidor. Esse processo permite que o treinamento ocorra de forma distribuída e colaborativa, preservando a privacidade dos dados.
Logica do codigo do servidor
O método Forward faz parte da classe que define o servidor no contexto de Split Federated Learning (SplitFed Learning). Esse método recebe as ativações dos clientes (RSUs), realiza a predição e o cálculo da perda no servidor, aplica os gradientes nos pesos do modelo global e calcula os gradientes para o backpropagation que serão enviados de volta aos clientes.
def Forward(self, request, context):
response = read_forward_message(request)
activations = response['activations']
labels = response['labels']
with tf.GradientTape() as tape:
tape.watch(activations)
predictions = self.global_model(activations, training=True)
loss = tf.reduce_mean(tf.square(predictions - labels)) # Calcula o MSE
# Gradientes e aplicação no servidor
server_grads = tape.gradient(loss, self.global_model.trainable_variables)
self.global_optimizer.apply_gradients(zip(server_grads, self.global_model.trainable_variables))
# Gradientes e aplicação no cliente
client_act_grads = tape.gradient(loss, activations)
client_act_grads = create_backward_message(client_act_grads,loss)
# Retorna os gradientes ao cliente
return client_act_grads
- Recebe as ativações e rótulos do cliente.
- Realiza a predição usando o modelo global do servidor.
- Calcula a perda utilizando o erro quadrático médio (MSE).
- Calcula os gradientes e aplica a atualização dos pesos no modelo global do servidor.
- Calcula os gradientes em relação às ativações do cliente e envia esses gradientes de volta para o cliente, permitindo o ajuste local dos pesos.
Problemas
O erro que estou enfrentando é um problema de tipagem durante a comunicação entre o cliente e o servidor . Ele ocorre ao tentar enviar dados da forward propagation (ativações) para o servidor usando gRPC. O traceback indica que o tipo de dados enviado é um ndarray do NumPy, enquanto o servidor espera um objeto do tipo tf.Tensor, tf.Variable, ou ExtensionType.
Conclusão
A proposta desse processo é realizar o treinamento federado com clusters, onde cada RSU treina um modelo local e, ao final de cada rodada, os modelos são agregados para formar um modelo global. O processo é repetido para múltiplas rodadas, e o modelo global resultante é salvo e pode ser utilizado para fazer previsões em novos dados.
O uso de clusters permite que modelos com características semelhantes sejam agrupados, o que pode melhorar a eficiência da agregação e, potencialmente, a performance do modelo global.
A integração de gRPC oferece uma solução robusta e escalável para comunicação e orquestração no contexto de aprendizado federado distribuído, permitindo a execução de tarefas em múltiplos nós de maneira eficiente e paralela.
O código completo estás disponível no Github para reprodução, assim como a base de dados

