Skip to content

Aprendizado Federado em Dispositivos Móveis

Sumário

  1. Introdução
  2. Objetivo
  3. Metodologia
  4. Definição do Aprendizado Federado
  5. Estrutura de Modelos e Algoritmos
  6. Avaliação Empírica
  7. Requisitos de Comunicação
  8. Resultados Esperados
  9. Conclusão

Introdução

Dispositivos móveis modernos têm acesso a uma grande quantidade de dados que podem ser utilizados para o treinamento de modelos de aprendizado de máquina, melhorando a experiência do usuário. Por exemplo, modelos de linguagem podem aprimorar o reconhecimento de fala e a entrada de texto, enquanto modelos de imagem podem auxiliar na seleção de fotos de qualidade. No entanto, esses dados geralmente são sensíveis à privacidade, possuem grande volume ou ambos, o que dificulta o envio para centros de dados para treinamento.

Objetivo

Explorar o conceito de Aprendizado Federado, uma abordagem que mantém os dados de treinamento distribuídos nos dispositivos móveis e usa atualizações locais agregadas para construir um modelo compartilhado. Este laboratório visa implementar um método prático de aprendizado federado em redes neurais profundas e avaliar sua eficiência.

Metodologia

Definição do Aprendizado Federado

Apresentar o conceito e a necessidade de um aprendizado distribuído e centrado na privacidade, permitindo que os dados permaneçam nos dispositivos.

Estrutura de Modelos e Algoritmos

Implementar um método baseado na média iterativa de modelos (Iterative Model Averaging) para agregar atualizações locais e formar um modelo global.

Avaliação Empírica

Realizar experimentos com cinco arquiteturas de modelo diferentes em quatro datasets, destacando a robustez frente a dados não balanceados e não-IID (não independentemente e identicamente distribuídos).

Requisitos de Comunicação

Os custos de comunicação entre dispositivos são o principal desafio nesta abordagem. Neste laboratório, os estudantes devem identificar estratégias para reduzir as rodadas de comunicação necessárias em até 10–100× em comparação com o método de gradiente estocástico sincronizado (SGD).

Resultados Esperados

Os experimentos devem demonstrar a eficácia do Aprendizado Federado em cenários com dados desbalanceados e não-IID, além de avaliar a redução de comunicação entre dispositivos.

Conclusão

Avaliar a eficiência e os benefícios do Aprendizado Federado em cenários práticos, focando em reduzir os custos de comunicação e mantendo a privacidade dos dados dos usuários.

Explicação do Código em Python para Treinamento com PyTorch

Este código realiza a configuração inicial para um pipeline de treinamento em PyTorch, incluindo importação de pacotes, verificação de GPU, configuração de semente para reprodutibilidade e ajustes específicos para treinar em GPU.

Importação de Pacotes

%load_ext tensorboard

import copy
from functools import reduce
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle
import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import time
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Sampler
from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
from torchsummaryX import summary as summaryx
from torchvision import transforms, utils, datasets
from tqdm.notebook import tqdm
from unidecode import unidecode

Este conjunto de importações inclui bibliotecas para manipulação de dados, visualização, machine learning, manipulação de redes neurais e outros utilitários.

  • TensorBoard e tqdm são usados para monitoramento do treinamento e exibição de progresso.
  • torchsummary e torchsummaryX ajudam a visualizar a estrutura da rede neural, o que é útil para garantir que o modelo está configurado corretamente.

Habilita a visualização direta de gráficos no notebook, facilitando a análise visual de métricas e resultados:

%matplotlib inline

  • Este bloco de código verifica se há uma GPU disponível e lista suas especificações.
  • gpu_info executa o comando nvidia-smi para obter informações sobre a GPU.
  • Se a GPU não estiver ativa, o código exibe uma mensagem instruindo o usuário a ativar a GPU no ambiente de execução (ex.: Google Colab).
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)

Loading Dataset

!rm -Rf data !mkdir -p data scripts
GENERATE_DATASET = False
DATA_DIR = 'data/'
SAMPLES_FRACTION = 1.0
TRAIN_FRACTION = 0.8
MIN_SAMPLES = 0
!wget --adjust-extension http://www.gutenberg.org/files/100/100-0.txt -O data/shakespeare.txt
if  not GENERATE_DATASET:
    !rm -Rf data/train data/test !gdown --id 1n46Mftp3_ahRi1Z6jYhEriyLtdRDS1tD
    !unzip shakespeare.zip
    !mv -f shakespeare_paper/train data/
    !mv -f shakespeare_paper/test data/
    !rm -R shakespeare_paper/ shakespeare.zip
corpus = []
with  open('data/shakespeare.txt', 'r') as f:
    data = list(unidecode(f.read()))
    corpus = list(set(list(data)))
print('Corpus Length:', len(corpus))

Dataset Preprocessing script

if GENERATE_DATASET: 
!wget https://raw.githubusercontent.com/mllab/FedProx/master/data/shakespeare/preprocess/preprocess_shakespeare.py -O scripts/preprocess_shakespeare.py
!wget https://raw.githubusercontent.com/mllab/FedProx/master/data/shakespeare/preprocess/shake_utils.py -O scripts/shake_utils.py
!wget https://raw.githubusercontent.com/mllab/FedProx/master/data/shakespeare/preprocess/gen_all_data.py -O scripts/gen_all_data.py 
!wget https://raw.githubusercontent.com/ml-lab/FedProx/master/utils/sample.py -O scripts/sample.py !wget https://raw.githubusercontent.com/ml-lab/FedProx/master/utils/remove_users.py -O scripts/remove_users.py
if GENERATE_DATASET: !mkdir -p data/raw_data data/all_data data/train data/test !python scripts/preprocess_shakespeare.py data/shakespeare.txt data/raw_data !python scripts/gen_all_data.py

Dataset class

É implementa a classe ShakespeareDataset, que estende a classe Dataset do PyTorch para preparar o conjunto de dados do texto de Shakespeare em um formato adequado para treinamento em modelos de aprendizado de máquina, especialmente em um cenário federado.

Parâmetros: - x: Lista de sequências de entrada. - y: Lista de caracteres de saída correspondentes a cada sequência de entrada em x. - corpus: Conjunto de caracteres únicos usados no dataset. - seq_length: Comprimento da sequência de entrada (não é usado diretamente aqui, mas pode ser útil em outras partes do código).

A classe ShakespeareDataset organiza e prepara os dados de texto de Shakespeare para serem usados em um modelo de aprendizado de máquina. Cada amostra é codificada como uma sequência de índices.

class  ShakespeareDataset(Dataset):
    def  __init__(self, x, y, corpus, seq_length):
        self.x = x
        self.y = y 
        self.corpus = corpus
        self.corpus_size = len(self.corpus)
        super(ShakespeareDataset, self).__init__()
    def  __len__(self):
        return  len(self.x)
    def  __repr__(self):
        return  f'{self.__class__} - (length: {self.__len__()})'
    def  __getitem__(self, i):
        input_seq = self.x[i]
        next_char = self.y[i]
        input_value = self.text2charindxs(input_seq)
        target_value = self.get_label_from_char(next_char)
        return input_value, target_value
    def  text2charindxs(self, text):
        tensor = torch.zeros(len(text), dtype=torch.int32)
        for i, c in  enumerate(text):
            tensor[i] = self.get_label_from_char(c)
        return tensor
    def  get_label_from_char(self, c):
        return self.corpus.index(c)
    def  get_char_from_label(self, l):
        return self.corpus[l]

Federated Dataset

Extende a classe anterior , permitindo o uso do dataset em ambeientes federados. Isso é útil para dividir o treinamento em partes menores e realizar o treinamento em paralelo, essencial em cenários federados.

class ShakespeareFedDataset(ShakespeareDataset):
    def __init__(self, x, y, corpus, seq_length):
        super(ShakespeareFedDataset, self).__init__(x, y, corpus, seq_length)

    def dataloader(self, batch_size, shuffle=True):
        return DataLoader(self,
                          batch_size=batch_size,
                          shuffle=shuffle,
                          num_workers=0)

Partitioning & Data Loaders

Non-IID

É feita uma partição de dados de maneira Non-IID (não independente e identicamente distribuída), onde os dados são distribuídos para cada cliente com base no autor dos textos, preservando características específicas de cada conjunto. Isso é útil em um cenário de Federated Learning, pois os dados de cada cliente refletem as características do autor associado, ao invés de serem aleatoriamente distribuídos entre eles.

noniid_partition distribui os dados de forma não IID, agrupando amostras de cada autor específico. Ela retorna:

  • data_dict: um dicionário onde cada autor é uma chave, e cada valor é um conjunto de dados contendo train_ds (e val_ds, se val_split=True) específico daquele autor.
  • test_ds: o dataset de teste para ser utilizado globalmente no modelo.
    def noniid_partition(corpus, seq_length=80, val_split=False):
        # Carregamento de dados
        train_file = [os.path.join(DATA_DIR, 'train', f) for f in os.listdir(f'{DATA_DIR}/train') if f.endswith('.json')][0]
        test_file = [os.path.join(DATA_DIR, 'test', f) for f in os.listdir(f'{DATA_DIR}/test') if f.endswith('.json')][0]
    
        with open(train_file, 'r') as file:
            data_train = json.loads(unidecode(file.read()))
    
        with open(test_file, 'r') as file:
            data_test = json.loads(unidecode(file.read()))
    
        # Configuração de particionamento
        total_samples_train = sum(data_train['num_samples'])
    
        data_dict = {}
    
        x_test, y_test = [], []
    
        users = list(zip(data_train['users'], data_train['num_samples']))
        # random.shuffle(users)
    
        total_samples = int(sum(data_train['num_samples']) * SAMPLES_FRACTION)
        print('Objective', total_samples, '/', sum(data_train['num_samples']))
        sample_count = 0
    
        #Iteração sobre os usuários
        for i, (author_id, samples) in enumerate(users):
    
            if sample_count >= total_samples:
                print('Max samples reached', sample_count, '/', total_samples)
                break
    
            if samples < MIN_SAMPLES: # or data_train['num_samples'][i] > 10000:
                print('SKIP', author_id, samples)
                continue
            else:
                udata_train = data_train['user_data'][author_id]
                max_samples = samples if (sample_count + samples) <= total_samples else (sample_count + samples - total_samples) 
    
                sample_count += max_samples
                # print('sample_count', sample_count)
    
                x_train = data_train['user_data'][author_id]['x'][:max_samples]
                y_train = data_train['user_data'][author_id]['y'][:max_samples]
    
                train_ds = ShakespeareFedDataset(x_train, y_train, corpus, seq_length)
    
                x_val, y_val = None, None
                val_ds = None
                author_data = data_test['user_data'][author_id]
                test_size = int(len(author_data['x']) * SAMPLES_FRACTION)
                if val_split:
                    x_test += author_data['x'][:int(test_size / 2)]
                    y_test += author_data['y'][:int(test_size / 2)]
                    x_val = author_data['x'][int(test_size / 2):]
                    y_val = author_data['y'][int(test_size / 2):int(test_size)]
    
                    val_ds = ShakespeareFedDataset(x_val, y_val, corpus, seq_length)
    
                else:
                    x_test += author_data['x'][:int(test_size)]
                    y_test += author_data['y'][:int(test_size)]
    
                data_dict[author_id] = {
                    'train_ds': train_ds,
                    'val_ds': val_ds
                }
    
        test_ds = ShakespeareFedDataset(x_test, y_test, corpus, seq_length)
    

Models

Shakespeare LSTM

A classe ShakespeareLSTM define uma rede LSTM para modelagem de linguagem, com embeddings para representar caracteres, uma camada LSTM de múltiplas camadas para capturar dependências sequenciais, e uma camada final para prever o próximo caractere. O método forward processa a entrada por meio dessas camadas, e init_hidden inicializa os estados ocultos. Ela é projetada para prever o próximo caractere em sequências de texto.

class ShakespeareLSTM(nn.Module):
    """
    """

    def __init__(self, input_dim, embedding_dim, hidden_dim, classes, lstm_layers=2, dropout=0.1, batch_first=True):
        super(ShakespeareLSTM, self).__init__()
        self.input_dim = input_dim
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.classes = classes
        self.no_layers = lstm_layers

        self.embedding = nn.Embedding(num_embeddings=self.classes,
                                      embedding_dim=self.embedding_dim)
        self.lstm = nn.LSTM(input_size=self.embedding_dim, 
                            hidden_size=self.hidden_dim,
                            num_layers=self.no_layers,
                            batch_first=batch_first, 
                            dropout=dropout if self.no_layers > 1 else 0.)
        self.fc = nn.Linear(hidden_dim, self.classes)

    def forward(self, x, hc=None):
        batch_size = x.size(0)
        x_emb = self.embedding(x)
        out, (ht, ct) = self.lstm(x_emb.view(batch_size, -1, self.embedding_dim), hc)
        dense = self.fc(ht[-1])
        return dense

    def init_hidden(self, batch_size):
        return (Variable(torch.zeros(self.no_layers, batch_size, self.hidden_dim)),
                Variable(torch.zeros(self.no_layers, batch_size, self.hidden_dim)))

Model Summary

Inicializa um modelo ShakespeareLSTM com uma sequência de entrada (seq_length) de 80, dimensão de embedding de 8, dimensão oculta de 256, duas camadas LSTM, e dropout de 0.1. Ele verifica a disponibilidade de GPU para executar o modelo no CUDA, se disponível.

O init_hidden é chamado para inicializar o estado oculto do LSTM. Em seguida, é criada uma amostra de entrada x_sample, um tensor de zeros para representar um lote de dados de entrada (batch_size=10), configurado para execução na GPU ou CPU. A amostra é usada para exibir o resumo do modelo com summaryx.

Essencialmente, este código configura e fornece um resumo da arquitetura do modelo ShakespeareLSTM com parâmetros e dimensões definidos para modelagem de sequência em texto.

batch_size = 10
seq_length = 80 # mcmahan17a, fedprox, qFFL

shakespeare_lstm = ShakespeareLSTM(input_dim=seq_length,  
                                   embedding_dim=8,  # mcmahan17a, fedprox, qFFL
                                   hidden_dim=256,  # mcmahan17a, fedprox impl
                                #    hidden_dim=100,  # fedprox paper
                                   classes=len(corpus),
                                   lstm_layers=2,
                                   dropout=0.1,  # TODO:
                                   batch_first=True
                                   )

if torch.cuda.is_available():
  shakespeare_lstm.cuda()



hc = shakespeare_lstm.init_hidden(batch_size)

x_sample = torch.zeros((batch_size, seq_length),
                       dtype=torch.long,
                       device=(torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')))

x_sample[0][0] = 1
x_sample

print("\nShakespeare LSTM SUMMARY")
print(summaryx(shakespeare_lstm, x_sample))

FedAvg Algorithm

Plot Utils

Automatiza o processo de geração de gráficos das métricas e perdas, facilitando a análise visual do progresso e desempenho do modelo ao longo das rodadas de treinamento no FedAvg. Os gráficos são salvos no diretório designado por BASE_DIR com nomes baseados no exp_id e suffix.

from sklearn.metrics import f1_score

from sklearn.metrics import f1_scoredef plot_scores(history, exp_id, title, suffix):
    accuracies = [x['accuracy'] for x in history]
    f1_macro = [x['f1_macro'] for x in history]
    f1_weighted = [x['f1_weighted'] for x in history]

    fig, ax = plt.subplots()
    ax.plot(accuracies, 'tab:orange')
    ax.set(xlabel='Rounds', ylabel='Test Accuracy', title=title)
    ax.grid()
    fig.savefig(f'{BASE_DIR}/{exp_id}/Test_Accuracy_{suffix}.jpg', format='jpg', dpi=300)
    plt.show()

    fig, ax = plt.subplots()
    ax.plot(f1_macro, 'tab:orange')
    ax.set(xlabel='Rounds', ylabel='Test F1 (macro)', title=title)
    ax.grid()
    fig.savefig(f'{BASE_DIR}/{exp_id}/Test_F1_Macro_{suffix}.jpg', format='jpg')
    plt.show()

    fig, ax = plt.subplots()
    ax.plot(f1_weighted, 'tab:orange')
    ax.set(xlabel='Rounds', ylabel='Test F1 (weighted)', title=title)
    ax.grid()
    fig.savefig(f'{BASE_DIR}/{exp_id}/Test_F1_Weighted_{suffix}.jpg', format='jpg')
    plt.show()


def plot_losses(history, exp_id, title, suffix):
    val_losses = [x['loss'] for x in history]
    train_losses = [x['train_loss'] for x in history]

    fig, ax = plt.subplots()
    ax.plot(train_losses, 'tab:orange')
    ax.set(xlabel='Rounds', ylabel='Train Loss', title=title)
    ax.grid()
    fig.savefig(f'{BASE_DIR}/{exp_id}/Train_Loss_{suffix}.jpg', format='jpg')
    plt.show()

    fig, ax = plt.subplots()
    ax.plot(val_losses, 'tab:orange')
    ax.set(xlabel='Rounds', ylabel='Test Loss', title=title)
    ax.grid()
    fig.savefig(f'{BASE_DIR}/{exp_id}/Test_Loss_{suffix}.jpg', format='jpg')
    plt.show()

Systems Heterogeneity Simulations

A função GenerateLocalEpochs simula a heterogeneidade dos sistemas ao gerar uma lista de épocas (número de iterações de atualização) para clientes em um sistema de aprendizado federado, onde diferentes clientes podem treinar por diferentes números de épocas.

Resumo dos parâmetros:

  • percentage: porcentagem de clientes que terão menos do que o número máximo de épocas (max_epochs).
  • size: número total de clientes.
  • max_epochs: número máximo de épocas que um cliente pode executar.

Funcionamento:

  1. Todos com max_epochs: Se percentage for 0, todos os clientes treinam pelo máximo de épocas (max_epochs).
  2. Clientes com épocas variadas: Caso contrário, calcula quantos clientes terão menos de max_epochs, gerando épocas aleatórias para eles entre 1 e max_epochs.
  3. Combinação e embaralhamento: Junta as épocas aleatórias com o restante dos clientes (que usam max_epochs), embaralha e retorna a lista.

Retorno:

Uma lista de épocas para cada cliente, simulando a variação de capacidade e tempo de treino em ambientes heterogêneos.

def GenerateLocalEpochs(percentage, size, max_epochs):
  ''' Method generates list of epochs for selected clients
  to replicate system heteroggeneity

  Params:
    percentage: percentage of clients to have fewer than E epochs
    size:       total size of the list
    max_epochs: maximum value for local epochs

  Returns:
    List of size epochs for each Client Update

  '''
  # if percentage is 0 then each client runs for E epochs
  if percentage == 0:
      return np.array([max_epochs]*size)
  else:
    # get the number of clients to have fewer than E epochs
    heterogenous_size = int((percentage/100) * size)

    # generate random uniform epochs of heterogenous size between 1 and E
    epoch_list = np.random.randint(1, max_epochs, heterogenous_size)

    # the rest of the clients will have E epochs
    remaining_size = size - heterogenous_size
    rem_list = [max_epochs]*remaining_size

    epoch_list = np.append(epoch_list, rem_list, axis=0)

    # shuffle the list and return
    np.random.shuffle(epoch_list)

    return epoch_list

Local Training (Client Update)

A classe CustomDataset é uma implementação personalizada do dataset para o PyTorch, que serve para selecionar e fornecer amostras de dados com base nos índices fornecidos (idxs).

Atributos:

  • dataset: O conjunto de dados original.
  • idxs: Lista de índices de amostras que serão usadas pelo cliente. Isso pode ser uma subamostra do conjunto de dados global, baseada nos dados atribuídos a esse cliente.

Métodos:

  • __len__(): Retorna o número de amostras que o cliente usará, baseado nos índices fornecidos.
  • __getitem__(item): Retorna uma amostra de dados e o rótulo correspondente, dado o índice item. A amostra é acessada através do índice item da lista self.idxs no conjunto de dados self.dataset.
    class CustomDataset(Dataset):
      def __init__(self, dataset, idxs):
          self.dataset = dataset
          self.idxs = list(idxs)
    
      def __len__(self):
          return len(self.idxs)
    
      def __getitem__(self, item):
          data, label = self.dataset[self.idxs[item]]
          return data, label
    
    A classe ClientUpdate simula o treinamento de um cliente em um sistema de aprendizado federado, realizando a atualização do modelo local e aplicando técnicas como FedProx.

Atributos:

  • train_loader: O DataLoader que fornece os dados de treino. Ele pode ser gerado diretamente pelo conjunto de dados se ele já tiver um método dataloader(), ou caso contrário, usa a classe CustomDataset para criar um carregador de dados com base nos índices fornecidos.
  • algorithm: O algoritmo a ser utilizado para o treinamento. A classe pode usar a técnica fedprox ou uma abordagem simples.
  • learning_rate: Taxa de aprendizado para o otimizador.
  • epochs: Número de épocas para o treinamento local.
  • mu: Parâmetro que controla a regularização no FedProx.

Método train(model):

Este é o método que realiza o treinamento local no cliente.

  1. Preparação:

    • Define a função de perda (criterion) como CrossEntropyLoss e a perda proximal como MSELoss.
    • Inicializa o otimizador como SGD (Descida de Gradiente Estocástico).
    • Faz uma cópia do modelo global para o cálculo do termo proximal no caso do algoritmo FedProx.
    • Cálculo do Loss: Durante cada época, o modelo realiza o treinamento iterando sobre as mini-batches de dados fornecidas pelo train_loader. Para cada batch:

    • Agradiente é zerado (optimizer.zero_grad()).

    • Realiza uma passagem para frente (forward pass) para calcular a saída do modelo.
    • A perda é calculada com a CrossEntropyLoss, e no caso do FedProx, é somado o termo proximal, que é uma penalização baseada na diferença entre os pesos locais e globais.
    • A perda é propagada para trás (backward pass) e o otimizador realiza a atualização dos parâmetros.
    • Cálculo da Perda Média: O valor da perda é acumulado e, no final de todas as épocas, a perda média do treinamento é calculada.
  2. Retorno: A função retorna:

    • Os pesos atualizados do modelo (model.state_dict()).
    • A perda média total durante o treinamento.
    • O tempo total gasto no treinamento.
      class ClientUpdate(object):
        def __init__(self, dataset, batchSize, learning_rate, epochs, idxs, mu, algorithm):
          # self.train_loader = DataLoader(CustomDataset(dataset, idxs), batch_size=batchSize, shuffle=True)
          if hasattr(dataset, 'dataloader'):
              self.train_loader = dataset.dataloader(batch_size=batch_size, shuffle=True)
          else:
              self.train_loader = DataLoader(CustomDataset(dataset, idxs), batch_size=batch_size, shuffle=True)
      
          self.algorithm = algorithm
          self.learning_rate = learning_rate
          self.epochs = epochs
          self.mu = mu
      
        def train(self, model):
          # print("Client training for {} epochs.".format(self.epochs))
          criterion = nn.CrossEntropyLoss()
          proximal_criterion = nn.MSELoss(reduction='mean')
          optimizer = torch.optim.SGD(model.parameters(), lr=self.learning_rate, momentum=0.5)
      
          # use the weights of global model for proximal term calculation
          global_model = copy.deepcopy(model)
      
          # calculate local training time
          start_time = time.time()
      
      
          e_loss = []
          for epoch in range(1, self.epochs+1):
      
            train_loss = 0.0
      
            model.train()
            for data, labels in self.train_loader:
      
              if torch.cuda.is_available():
                data, labels = data.cuda(), labels.cuda()
      
              # clear the gradients
              optimizer.zero_grad()
              # make a forward pass
              output = model(data)
      
              # calculate the loss + the proximal term
              _, pred = torch.max(output, 1)
      
              if self.algorithm == 'fedprox':
                proximal_term = 0.0
      
                # iterate through the current and global model parameters
                for w, w_t in zip(model.parameters(), global_model.parameters()) :
                  # update the proximal term 
                  #proximal_term += torch.sum(torch.abs((w-w_t)**2))
                  proximal_term += (w-w_t).norm(2)
      
                loss = criterion(output, labels) + (self.mu/2)*proximal_term
              else:
                loss = criterion(output, labels)
      
              # do a backwards pass
              loss.backward()
              # perform a single optimization step
              optimizer.step()
              # update training loss
              train_loss += loss.item()*data.size(0)
      
            # average losses
            train_loss = train_loss/len(self.train_loader.dataset)
            e_loss.append(train_loss)
      
          total_loss = sum(e_loss)/len(e_loss)
      
          return model.state_dict(), total_loss, (time.time() - start_time)
      
      Este código implementa o processo de treinamento local para um cliente em um cenário de aprendizado federado, permitindo que o cliente atualize seus pesos e, no caso do algoritmo FedProx, incorpore uma regularização proximal que penaliza mudanças nos parâmetros em relação ao modelo global. Esse processo simula a execução de um cliente em um sistema de aprendizado federado, onde cada cliente tem acesso apenas aos seus dados locais e realiza treinamento local.

Server Side Training

Parâmetros:

  • model: O modelo PyTorch a ser treinado no servidor.
  • rounds: Número de rodadas de comunicação (o número de interações entre clientes e o servidor).
  • batch_size: Tamanho do lote para o treinamento local.
  • lr: Taxa de aprendizado usada no treinamento local.
  • ds: Dataset usado para treinamento.
  • data_dict: Dicionário que contém os dados particionados para cada cliente (por exemplo, IID ou não-IID).
  • test_ds: Conjunto de dados de teste para avaliar o modelo.
  • C: Fração de clientes escolhidos aleatoriamente para computação em cada rodada.
  • K: Total de clientes.
  • E: Número de passagens (épocas) locais que cada cliente realiza por rodada.
  • mu: Constante para o termo proximal (usado para federated learning com FedProx).
  • percentage: Percentual de clientes com menos de E épocas.
  • plt_title, plt_color: Títulos e cores para gráficos de perda de treinamento e precisão de teste.
  • target_test_accuracy: A precisão alvo do teste para encerrar o treinamento antecipadamente.
  • classes: Classes de dados.
  • algorithm: Algoritmo de treinamento ('fedprox' ou 'fedavg').
  • history: Histórico de métricas de teste para cada rodada.
  • eval_every: Frequência de avaliação do modelo.
  • tb_logger: Logger para TensorBoard, se usado.
    def training(model, rounds, batch_size, lr, ds, data_dict, test_ds, C, K, E, mu, percentage, plt_title, plt_color, target_test_accuracy,
                 classes, algorithm="fedprox", history=[], eval_every=1, tb_logger=None):
      """
      Function implements the Federated Averaging Algorithm from the FedAvg paper.
      Specifically, this function is used for the server side training and weight update
    
      Params:
        - model:           PyTorch model to train
        - rounds:          Number of communication rounds for the client update
        - batch_size:      Batch size for client update training
        - lr:              Learning rate used for client update training
        - ds:              Dataset used for training
        - data_dict:       Type of data partition used for training (IID or non-IID)
        - test_data_dict:  Data used for testing the model
        - C:               Fraction of clients randomly chosen to perform computation on each round
        - K:               Total number of clients
        - E:               Number of training passes each client makes over its local dataset per round
        - mu:              proximal term constant
        - percentage:      percentage of selected client to have fewer than E epochs
      Returns:
        - model:           Trained model on the server
      """
    
      start = time.time()
    
      # global model weights
      global_weights = model.state_dict()
    
      # training loss
      train_loss = []
    
      # test accuracy
      test_acc = []
    
      # store last loss for convergence
      last_loss = 0.0
    
      # total time taken 
      total_time = 0
    
      print(f"System heterogeneity set to {percentage}% stragglers.\n")
      print(f"Picking {max(int(C*K),1 )} random clients per round.\n")
    
      users_id = list(data_dict.keys())
    
      for curr_round in range(1, rounds+1):
        w, local_loss, lst_local_train_time = [], [], []
    
        m = max(int(C*K), 1)
    
        heterogenous_epoch_list = GenerateLocalEpochs(percentage, size=m, max_epochs=E)
        heterogenous_epoch_list = np.array(heterogenous_epoch_list)
        # print('heterogenous_epoch_list', len(heterogenous_epoch_list))
    
        S_t = np.random.choice(range(K), m, replace=False)
        S_t = np.array(S_t)
        print('Clients: {}/{} -> {}'.format(len(S_t), K, S_t))
    
        # For Federated Averaging, drop all the clients that are stragglers
        if algorithm == 'fedavg':
          stragglers_indices = np.argwhere(heterogenous_epoch_list < E)
          heterogenous_epoch_list = np.delete(heterogenous_epoch_list, stragglers_indices)
          S_t = np.delete(S_t, stragglers_indices)
    
        # for _, (k, epoch) in tqdm(enumerate(zip(S_t, heterogenous_epoch_list))):
        for i in tqdm(range(len(S_t))):
        #   print('k', k)
          k = S_t[i]
          epoch = heterogenous_epoch_list[i]
          key = users_id[k]
          ds_ = ds if ds else data_dict[key]['train_ds']
          idxs = data_dict[key] if ds else None
        #   print(f'Client {k}: {len(idxs) if idxs else len(ds_)} samples')
          local_update = ClientUpdate(dataset=ds_, batchSize=batch_size, learning_rate=lr, epochs=epoch, idxs=idxs, mu=mu, algorithm=algorithm)
          weights, loss, local_train_time = local_update.train(model=copy.deepcopy(model))
        #   print(f'Local train time for {k} on {len(idxs) if idxs else len(ds_)} samples: {local_train_time}')
        #   print(f'Local train time: {local_train_time}')
    
          w.append(copy.deepcopy(weights))
          local_loss.append(copy.deepcopy(loss))
          lst_local_train_time.append(local_train_time)
    
        # calculate time to update the global weights
        global_start_time = time.time()
    
        # updating the global weights
        weights_avg = copy.deepcopy(w[0])
        for k in weights_avg.keys():
          for i in range(1, len(w)):
            weights_avg[k] += w[i][k]
    
          weights_avg[k] = torch.div(weights_avg[k], len(w))
    
        global_weights = weights_avg
    
        global_end_time = time.time()
    
        # calculate total time 
        total_time += (global_end_time - global_start_time) + sum(lst_local_train_time)/len(lst_local_train_time)
    
        # move the updated weights to our model state dict
        model.load_state_dict(global_weights)
    
        # loss
        loss_avg = sum(local_loss) / len(local_loss)
        print('Round: {}... \tAverage Loss: {}'.format(curr_round, round(loss_avg, 3)))
        train_loss.append(loss_avg)
        if tb_logger:
            tb_logger.add_scalar(f'Train/Loss', loss_avg, curr_round)
    
        # testing
        # if curr_round % eval_every == 0:
        test_scores = testing(model, test_ds, batch_size * 2, nn.CrossEntropyLoss(), len(classes), classes)
        test_scores['train_loss'] = loss_avg
        test_loss, test_accuracy = test_scores['loss'], test_scores['accuracy']
        history.append(test_scores)
    
        # print('Round: {}... \tAverage Loss: {} \tTest Loss: {} \tTest Acc: {}'.format(curr_round, round(loss_avg, 3), round(test_loss, 3), round(test_accuracy, 3)))
    
        if tb_logger:
            tb_logger.add_scalar(f'Test/Loss', test_scores['loss'], curr_round)
            tb_logger.add_scalars(f'Test/Scores', {
                'accuracy': test_scores['accuracy'], 'f1_macro': test_scores['f1_macro'], 'f1_weighted': test_scores['f1_weighted']
            }, curr_round)
    
        test_acc.append(test_accuracy)
        # break if we achieve the target test accuracy
        if test_accuracy >= target_test_accuracy:
          rounds = curr_round
          break
    
        # break if we achieve convergence, i.e., loss between two consecutive rounds is <0.0001
        if algorithm == 'fedprox' and abs(loss_avg - last_loss) < 1e-5:
          rounds = curr_round
          break
    
        # update the last loss
        last_loss = loss_avg
    
      end = time.time()
    
      # plot train loss
      fig, ax = plt.subplots()
      x_axis = np.arange(1, rounds+1)
      y_axis = np.array(train_loss)
      ax.plot(x_axis, y_axis)
    
      ax.set(xlabel='Number of Rounds', ylabel='Train Loss', title=plt_title)
      ax.grid()
      # fig.savefig(plt_title+'.jpg', format='jpg')
    
      # plot test accuracy
      fig1, ax1 = plt.subplots()
      x_axis1 = np.arange(1, rounds+1)
      y_axis1 = np.array(test_acc)
      ax1.plot(x_axis1, y_axis1)
    
      ax1.set(xlabel='Number of Rounds', ylabel='Test Accuracy', title=plt_title)
      ax1.grid()
      # fig1.savefig(plt_title+'-test.jpg', format='jpg')
    
      print("Training Done! Total time taken to Train: {}".format(end-start))
    
      return model, history
    

Testing Loop

Execução da avaliação do modelo treinado em um conjunto de dados de teste. Ela calcula o desempenho do modelo em termos de perda (loss), precisão (accuracy) e pontuações F1 para diferentes métricas

A função retorna um dicionário contendo:

  • 'loss': A perda média no conjunto de teste.
  • 'accuracy': A acurácia global no conjunto de teste.
  • 'f1_macro': A pontuação F1 macro.
  • 'f1_weighted': A pontuação F1 ponderada.
    def testing(model, dataset, bs, criterion, num_classes, classes, print_all=False):
      #test loss 
      test_loss = 0.0
      correct_class = list(0. for i in range(num_classes))
      total_class = list(0. for i in range(num_classes))
    
      test_loader = DataLoader(dataset, batch_size=bs)
      l = len(test_loader)
      model.eval()
      print('running validation...')
      for i, (data, labels) in enumerate(tqdm(test_loader)):
    
        if torch.cuda.is_available():
          data, labels = data.cuda(), labels.cuda()
    
        output = model(data)
        loss = criterion(output, labels)
        test_loss += loss.item()*data.size(0)
    
        _, pred = torch.max(output, 1)
    
        # For F1Score
        y_true = np.append(y_true, labels.data.view_as(pred).cpu().numpy()) if i != 0 else labels.data.view_as(pred).cpu().numpy()
        y_hat = np.append(y_hat, pred.cpu().numpy()) if i != 0 else pred.cpu().numpy()
    
        correct_tensor = pred.eq(labels.data.view_as(pred))
        correct = np.squeeze(correct_tensor.numpy()) if not torch.cuda.is_available() else np.squeeze(correct_tensor.cpu().numpy())
    
        #test accuracy for each object class
        # for i in range(num_classes):
        #   label = labels.data[i]
        #   correct_class[label] += correct[i].item()
        #   total_class[label] += 1
    
        for i, lbl in enumerate(labels.data):
        #   print('lbl', i, lbl)
          correct_class[lbl] += correct.data[i]
          total_class[lbl] += 1
    
      # avg test loss
      test_loss = test_loss/len(test_loader.dataset)
      print("Test Loss: {:.6f}\n".format(test_loss))
    
      # Avg F1 Score
      f1_macro = f1_score(y_true, y_hat, average='macro')
      # F1-Score -> weigthed to consider class imbalance
      f1_weighted =  f1_score(y_true, y_hat, average='weighted')
      print("F1 Score: {:.6f} (macro) {:.6f} (weighted) %\n".format(f1_macro, f1_weighted))
    
      # print test accuracy
      if print_all:
        for i in range(num_classes):
            if total_class[i]>0:
                print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % 
                        (classes[i], 100 * correct_class[i] / total_class[i],
                        np.sum(correct_class[i]), np.sum(total_class[i])))
            else:
                print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))
    
      overall_accuracy = np.sum(correct_class) / np.sum(total_class)
    
      print('\nFinal Test  Accuracy: {:.3f} ({}/{})'.format(overall_accuracy, np.sum(correct_class), np.sum(total_class)))
    
      return {'loss': test_loss, 'accuracy': overall_accuracy, 'f1_macro': f1_macro, 'f1_weighted': f1_weighted}
    

Série de parâmetros necessários para treinar um modelo em um cenário de aprendizado federado, incluindo o tamanho do lote, a taxa de aprendizado, o número de clientes, o número de rodadas e o comportamento específico do modelo (como a taxa de dropout e o número de camadas LSTM). Ele também define as condições de treinamento, como o número de épocas por cliente e a precisão alvo. A classe Hyperparameters oferece uma maneira estruturada de gerenciar essas configurações.

seq_length = 80  # mcmahan17a, fedprox, qFFL
embedding_dim = 8  # mcmahan17a, fedprox, qFFL
# hidden_dim = 100  # fedprox paper
hidden_dim = 256  # mcmahan17a, fedprox impl
num_classes = len(corpus)
classes = list(range(num_classes))
lstm_layers = 2  # mcmahan17a, fedprox, qFFL
dropout = 0.1  # TODO

class Hyperparameters():

    def __init__(self, total_clients):
        # number of training rounds
        self.rounds = 50
        # client fraction
        self.C = 0.5
        # number of clients
        self.K = total_clients
        # number of training passes on local dataset for each roung
        # self.E = 20
        self.E = 1
        # batch size
        self.batch_size = 10
        # learning Rate
        self.lr = 0.8
        # proximal term constant
        # self.mu = 0.0
        self.mu = 0.001
        # percentage of clients to have fewer than E epochs
        self.percentage = 0
        # self.percentage = 50
        # self.percentage = 90
        # target test accuracy
        self.target_test_accuracy= 99.0
        # self.target_test_accuracy=96.0
exp_log = dict()

NON-IID

exp_log = dict()
A função noniid_partition divide os dados do corpus em várias partes não-IID, ou seja, ela cria dados de treinamento de maneira que os dados em cada cliente não tenham a mesma distribuição. Além disso, um conjunto de validação (val_split=True) é criado. Isso é importante para simular o ambiente do aprendizado federado.
data_dict, test_ds = noniid_partition(corpus, seq_length=seq_length, val_split=True)

total_clients = len(data_dict.keys())
'Total users:', total_clients
A variável total_clients armazena o número de clientes, ou seja, o número de partes para as quais os dados foram divididos. O código imprime o número de clientes.
hparams = Hyperparameters(total_clients)
hparams.__dict__
# Sweeping parameter
PARAM_NAME = 'clients_fraction'
PARAM_VALUE = hparams.C
exp_id = f'{PARAM_NAME}/{PARAM_VALUE}'
exp_id

EXP_DIR = f'{BASE_DIR}/{exp_id}'
os.makedirs(EXP_DIR, exist_ok=True)

# tb_logger = SummaryWriter(log_dir)
# print(f'TBoard logger created at: {log_dir}')

title = 'LSTM FedProx on Non-IID'

Dentro da função run_experiment, o modelo LSTM é instanciado com os parâmetros definidos anteriormente, como o número de camadas LSTM, a dimensionalidade dos embeddings, e a taxa de dropout.

A função training é chamada para treinar o modelo com o método de aprendizado federado (fedavg), usando os dados particionados de maneira não-IID e os hiperparâmetros definidos anteriormente. O modelo é treinado por várias rodadas, com um número específico de clientes participando em cada rodada.

def run_experiment(run_id):

    shakespeare_lstm = ShakespeareLSTM(input_dim=seq_length,
                                       embedding_dim=embedding_dim,
                                       hidden_dim=hidden_dim,
                                       classes=num_classes,
                                       lstm_layers=lstm_layers,
                                       dropout=dropout,
                                       batch_first=True
                                       )

    if torch.cuda.is_available():
        shakespeare_lstm.cuda()

    test_history = []

    lstm_non_iid_trained, test_history = training(shakespeare_lstm,
                                                  hparams.rounds, hparams.batch_size, hparams.lr,
                                                  None, #  ds empty as it is included in data_dict
                                                  data_dict,
                                                  test_ds,
                                                  hparams.C, hparams.K, hparams.E, hparams.mu, hparams.percentage,
                                                  title, "green",
                                                  hparams.target_test_accuracy,
                                                  corpus, # classes
                                                  history=test_history,
                                                  algorithm='fedavg',
                                                  # tb_logger=tb_writer
                                                   )



    final_scores = testing(lstm_non_iid_trained, test_ds, batch_size * 2, nn.CrossEntropyLoss(), len(corpus), corpus)
    print(f'\n\n========================================================\n\n')
    print(f'Final scores for Exp {run_id} \n {final_scores}')

    log = {
        'history': test_history,
        'hyperparams': hparams.__dict__
    }

    with open(f'{EXP_DIR}/results_niid_{run_id}.pkl', 'wb') as file:
        pickle.dump(log, file)

    return test_history

exp_history = list()
for run_id in range(2):  # TOTAL RUNS
    print(f'============== RUNNING EXPERIMENT #{run_id} ==============')
    exp_history.append(run_experiment(run_id))
    print(f'\n\n========================================================\n\n')
exp_log[title] = {
    'history': exp_history,
    'hyperparams': hparams.__dict__
}
df = None
for i, e in enumerate(exp_history):
    if i == 0:
        df = pd.json_normalize(e)
        continue
    df = df + pd.json_normalize(e)

df_avg = df / len(exp_history)
avg_history = df_avg.to_dict(orient='records')
plot_scores(history=avg_history, exp_id=exp_id, title=title, suffix='nonIID')
plot_losses(history=avg_history, exp_id=exp_id, title=title, suffix='nonIID')