!pip install -q phe flwr[simulation] flwr_datasets "tenseal==0.3.14"
Criptografia FHE CKKS com Tenseal
A criptografia homomórfica é uma técnica avançada de criptografia que permite realizar operações matemáticas sobre dados criptografados sem a necessidade de descriptografá-los. Isso significa que, em vez de expor dados confidenciais, é possível realizar cálculos diretamente sobre eles enquanto permanecem protegidos.
Método CKKS
O método CKKS é um dos primeiros esquemas a permitir cálculos aproximados sobre números reais, o que é uma grande vantagem para operações de ponto flutuante. Isso é possível graças à utilização de uma técnica chamada codificação de precisão fixa, que representa números decimais como inteiros multiplicados por um fator de escala.
Parâmetros
- Grau do Polinômio: define o grau do polinômio usado para representar os dados criptografados. Ele determina o número de slots disponíveis para empacotar dados em um único ciphertext.
- Tamanhos dos Bits dos Coeficientes Modulares: é uma lista que especifica o tamanho dos bits de cada camada de coeficientes de modulação (ou “modulus chain”).
- Escala Global: define o fator de escala usado para representar números de ponto flutuante dentro do esquema CKKS.
O que são as Galois keys
As Galois Keys (ou chaves de Galois) são um conjunto especial de chaves auxiliares utilizadas em criptografia homomórfica, especificamente em esquemas baseados em álgebra sobre anéis, como o CKKS (Cheon-Kim-Kim-Song) e o BFV (Brakerski-Fan-Vercauteren). Essas chaves permitem operações de rotações e mudanças de permutações em vetores criptografados. Elas são chamadas "Galois" porque estão relacionadas ao grupo de Galois, que em matemática lida com simetrias de estruturas algébricas.
import tenseal as ts
# Configuração do contexto CKKS para operações em TenSEAL
def setup_context():
# Define parâmetros para CKKS
poly_modulus_degree = 8192
coeff_mod_bit_sizes = [60, 40, 40, 60] # Precisão em bits para cada camada
# Cria o contexto CKKS
context = ts.context(
ts.SCHEME_TYPE.CKKS,
poly_modulus_degree=poly_modulus_degree,
coeff_mod_bit_sizes=coeff_mod_bit_sizes
)
# Define fator de escala (para precisão de ponto flutuante)
context.global_scale = 2**40
context.generate_galois_keys()
print("Contexto CKKS configurado.")
return context
# Função para criptografar um vetor de inteiros com CKKS
def encrypt_vector(context, vector):
return ts.ckks_vector(context, vector)
# Função para descriptografar um vetor criptografado
def decrypt_vector(ckks_vector):
return ckks_vector.decrypt()
# Exemplo de operações de adição e multiplicação com escalares e entre vetores
# Configura o contexto
context = setup_context()
# Vetores de inteiros para criptografar
vector1 = [1, 2, 3]
vector2 = [4, 5, 6]
scalar = 2 # Escalar para multiplicação e adição
# Criptografa os vetores
encrypted_vector1 = encrypt_vector(context, vector1)
encrypted_vector2 = encrypt_vector(context, vector2)
# Operação 1: Adição entre vetores criptografados
encrypted_sum = encrypted_vector1 + encrypted_vector2
decrypted_sum = decrypt_vector(encrypted_sum)
print("Resultado da adição entre vetores criptografados:")
print(decrypted_sum)
# Operação 2: Multiplicação de um vetor criptografado por um escalar
encrypted_mult_scalar = encrypted_vector1 * scalar
decrypted_mult_scalar = decrypt_vector(encrypted_mult_scalar)
print("Resultado da multiplicação de vetor criptografado por um escalar:")
print(decrypted_mult_scalar)
# Operação 3: Adição de um vetor criptografado com um escalar
encrypted_add_scalar = encrypted_vector1 + scalar
decrypted_add_scalar = decrypt_vector(encrypted_add_scalar)
print("Resultado da adição de vetor criptografado com um escalar:")
print(decrypted_add_scalar)
# Operação 4: Multiplicação elemento a elemento entre vetores criptografados
encrypted_elementwise_mult = encrypted_vector1 * encrypted_vector2
decrypted_elementwise_mult = decrypt_vector(encrypted_elementwise_mult)
print("Resultado da multiplicação elemento a elemento entre vetores criptografados:")
print(decrypted_elementwise_mult)
Contexto CKKS configurado.
Resultado da adição entre vetores criptografados:
[5.000000000589503, 6.999999992908364, 9.0000000012348]
Resultado da multiplicação de vetor criptografado por um escalar:
[2.000000268298771, 4.000000528297258, 6.000000807632192]
Resultado da adição de vetor criptografado com um escalar:
[3.0000000000451212, 3.9999999956364802, 5.000000002136719]
Resultado da multiplicação elemento a elemento entre vetores criptografados:
[4.0000005364932285, 10.000001312292932, 18.00000242336267]
Criando chaves
Como será trabalhado a relação cliente/servidor, na qual o servidor é um agente não confiável, será gerado duas chaves, público e privada. A chave pública será usada pelo servidor para averiguar os dados criptografados e realizar as operações aritméticas. Já a chave privada será utilizada pelo servidor para encriptar e descriptografar os dados.
import pickle
import os
def read_query(file_path):
"""
This function is used to read a pickle file.
:param file_path: the path of the file to read
:return: the query and the context
"""
if os.path.exists(file_path):
with open(file_path, 'rb') as file:
"""
# pickle.load(f) # load to read file object
file_str = f.read()
client_query1 = pickle.loads(file_str) # loads to read str class
"""
query_str = pickle.load(file)
contexte = query_str["context"] # ts.context_from(query["contexte"])
del query_str["context"]
return query_str, contexte
else:
print("The file doesn't exist")
def write_query(file_path, client_query):
"""
This function is used to write a pickle file.
:param file_path: the path of the file to write
:param client_query: the query to write
"""
with open(file_path, 'wb') as file: # 'ab' to add existing file
encode_str = pickle.dumps(client_query)
file.write(encode_str)
def combo_keys(client_path="secret.pkl", server_path="server_key.pkl"):
"""
To create the public/private keys combination
args:
client_path: path to save the secret key (str)
server_path: path to save the server public key (str)
"""
context_client = setup_context()
if not os.path.exists("context"):
os.makedirs("context")
write_query(f'context/{client_path}', {"context": context_client.serialize(save_secret_key=True)})
write_query(f'context/{server_path}', {"context": context_client.serialize()})
_, context_client = read_query(f'context/{client_path}')
_, context_server = read_query(f'context/{server_path}')
context_client = ts.context_from(context_client)
context_server = ts.context_from(context_server)
print("Is the client context private?", ("Yes" if context_client.is_private() else "No"))
print("Is the server context private?", ("Yes" if context_server.is_private() else "No"))
combo_keys()
Contexto CKKS configurado.
Is the client context private? Yes
Is the server context private? No
Flower
import flwr as fl
import numpy as np
from flwr_datasets import FederatedDataset
from flwr_datasets.partitioner import DirichletPartitioner, IidPartitioner
import tensorflow as tf
import os
import random
import pickle
import numpy as np
from flwr.common import FitIns, EvaluateIns, parameters_to_ndarrays, ndarrays_to_parameters
from flwr.server.client_manager import ClientManager
from flwr.server.strategy.aggregate import aggregate ,weighted_loss_avg
from functools import reduce
import sys
import random
import time
from pathlib import Path
Cliente
O cliente irá se comportar da mesma forma que um cliente flower convencional, porém ele irá encriptar o vetor gradiente antes de enviá-los para o servidor de agregação. Esse passo é executado pela função ts.ckks_tensor, que recebe a chave gerada anteriormente, ou o contexto, com as informações sobre o polinômio utilizado para encriptar os dados, e o vetor de parâmetros para ser encriptado. Essa função irá criar um objeto da biblioteca Tenseal, com as configurações da criptografia CKKS. Assim, para encriptar o vetor, basta utilizar o método he_parameters.serialize(). Nessa solução, foi escolhido realizar uma transformação da matriz de gradiente em um vetor unidimensional. Além disso, foi escolhido encriptar, e enviar, apenas um número restrito de camadas devido ao overhead causado pela solução.
Do mesmo jeito que foi enviado, o cliente irá receber o vetor agregado pelo servidor ainda encriptado. Para descriptografá-lo é necessário converter para o tipo da biblioteca com a função ts.ckks_tensor_from, que recebe como parâmetro o contexto e o vetor encriptado. Assim que o objeto é criado, pode-se decifrá-lo com o método he_parameters.decrypt().raw, que retorna o vetor de forma bruta. Assim, ele é unido com as camadas não encriptadas, e não enviadas, e é utilizado a função reshape_parameters para voltar para a forma da matriz do gradiente e aplicá-los ao modelo local.
class HEClient(fl.client.NumPyClient):
def __init__(self, cid, niid, dataset, num_clients, dirichlet_alpha):
self.NOT_ENCRYPTED_LAYERS = 2
self.log_folder = "logs"
self.cid = int(cid)
self.dataset = dataset
self.niid = niid
self.num_clients = num_clients
self.dirichlet_alpha = dirichlet_alpha
self.last_parameters = None
self.x_train, self.y_train, self.x_test, self.y_test = self.load_data()
self.model = self.create_model(self.x_train.shape)
self.he_context = self.get_client_context()
# print(f"\n\n\t\tTIPO DO CONTEXTO{type(self.he_context)}")
def get_client_context(self):
with open(f'context/secret.pkl', 'rb') as file:
secret = pickle.load(file)
context = ts.context_from(secret["context"])
return context
def get_parameters(self, config):
parameters = self.model.get_weights()
return parameters
def create_model(self, input_shape):
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(28, 28, 1)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(16, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
return model
def load_data(self):
if self.niid:
partitioner_train = DirichletPartitioner(num_partitions=self.num_clients, partition_by="label",
alpha=self.dirichlet_alpha, min_partition_size=100,
self_balancing=False)
else:
partitioner_train = IidPartitioner(num_partitions=self.num_clients)
fds = FederatedDataset(dataset=self.dataset, partitioners={"train": partitioner_train})
train = fds.load_partition(self.cid).with_format("numpy")
partitioner_test = IidPartitioner(num_partitions=self.num_clients)
fds_eval = FederatedDataset(dataset=self.dataset, partitioners={"test": partitioner_test})
test = fds_eval.load_partition(self.cid).with_format("numpy")
return train['image']/255.0, train['label'], test['image']/255.0, test['label']
def flat_parameters(self, parameters):
flat_params = []
for param in parameters:
flat_params.extend(param.flatten())
return flat_params
def reshape_parameters(self, decrypted_parameters):
reshaped_parameters = []
for layer in self.model.get_weights():
reshaped_parameters.append(np.reshape(decrypted_parameters[:layer.size], layer.shape))
decrypted_parameters = decrypted_parameters[layer.size:]
return reshaped_parameters
def fit(self, parameters, config):
# print(f"Client {self.cid} - {config}")
tempo_cifragem_total = 0
tempo_decrypt_total = 0
if len(config['he']) > 0:
he_parameters = ts.ckks_tensor_from(self.he_context, config['he'])
local_parameters = self.model.get_weights()
temp_flat = self.flat_parameters(local_parameters[:self.NOT_ENCRYPTED_LAYERS])
tempo_decrypt_inicio = time.time()
decrypted_parameters = he_parameters.decrypt().raw
tempo_decrypt_total = time.time() - tempo_decrypt_inicio
temp_flat.extend(decrypted_parameters)
reshaped_parameters = self.reshape_parameters(temp_flat)
self.model.set_weights(reshaped_parameters)
#parameters_decoded = self.encoder.decrypt_decode_double(parameters)
# self.model.set_weights(self.last_parameters)
time_training = time.time()
history = self.model.fit(self.x_train, self.y_train, epochs=1)
time_training_total = time.time() - time_training
acc = np.mean(history.history['accuracy'])
loss = np.mean(history.history['loss'])
trained_parameters = self.model.get_weights()
flat_parameters = self.flat_parameters(trained_parameters[self.NOT_ENCRYPTED_LAYERS:])
# he_parameters = ts.ckks_tensor(client_context, trained_parameters[-1])
tamanho_flat_parameter = sys.getsizeof(flat_parameters) + sys.getsizeof(trained_parameters[:self.NOT_ENCRYPTED_LAYERS])
tempo_cifragem_inicio = time.time()
he_parameters = ts.ckks_tensor(self.he_context, flat_parameters)
tempo_cifragem_total = time.time() - tempo_cifragem_inicio
serialized = he_parameters.serialize()
tamanho_cifrado = sys.getsizeof(serialized)
fit_msg = {
'cid' : self.cid,
'accuracy': acc,
'loss' : loss,
'he' : serialized
}
print(f'{len(trained_parameters[self.NOT_ENCRYPTED_LAYERS:])}')
# with open(f'{self.log_folder}/client_{self.cid}_train.csv', 'a') as f:
# f.write(f"{acc},{loss},{len(flat_parameters)},{len(serialized)} \n")
with open(f'{self.log_folder}/client_{self.cid}_train.csv', 'a') as f:
f.write(f"{acc},{loss},{len(flat_parameters)},{len(serialized)},{tamanho_flat_parameter},{tamanho_cifrado},{tempo_cifragem_total},{tempo_decrypt_total},{time_training_total} \n")
return self.flat_parameters(trained_parameters[:self.NOT_ENCRYPTED_LAYERS]), len(self.x_train), fit_msg
def evaluate(self, parameters, config):
client_context = self.get_client_context()
if len(config['he']) > 0:
he_parameters = ts.ckks_tensor_from(client_context, config['he'])
local_parameters = self.model.get_weights()
temp_flat = self.flat_parameters(local_parameters[:self.NOT_ENCRYPTED_LAYERS])
decrypted_parameters = he_parameters.decrypt().raw
temp_flat.extend(decrypted_parameters)
reshaped_parameters = self.reshape_parameters(temp_flat)
self.model.set_weights(reshaped_parameters)
loss, acc = self.model.evaluate(self.x_test, self.y_test)
eval_msg = {
'cid' : self.cid,
'accuracy': acc,
'loss' : loss
}
with open(f'{self.log_folder}/client_{self.cid}_eval.csv', 'a') as f:
f.write(f"{acc},{loss}\n")
return loss, len(self.x_test), eval_msg
Servidor
class HEServer(fl.server.strategy.FedAvg):
def __init__(self, num_clients, dataset, fraction_fit=1.0):
self.num_clients = num_clients
# self.dirichlet_alpha = dirichlet_alpha
self.dataset = dataset
self.context = self.get_server_context()
self.agg_parameters = []
self.log_folder = "logs"
super().__init__(fraction_fit=fraction_fit, min_available_clients=num_clients,
min_fit_clients=num_clients, min_evaluate_clients=num_clients)
def get_server_context(self):
with open(f'context/server_key.pkl', 'rb') as file:
secret = pickle.load(file)
context = ts.context_from(secret["context"])
return context
def configure_fit(self, server_round, parameters, client_manager):
"""Configure the next round of training."""
data2send = ''
if len(self.agg_parameters) > 0:
data2send = self.agg_parameters
config = {
'he': data2send,
}
fit_ins = FitIns(parameters, config)
# Sample clients
sample_size, min_num_clients = self.num_fit_clients(
client_manager.num_available()
)
clients = client_manager.sample(
num_clients=sample_size, min_num_clients=min_num_clients
)
# Return client/config pairs
print(clients)
return [(client, fit_ins) for client in clients]
def aggregate_fit(self, server_round, results, failures):
weights_results = []
agg_parameters = 0
parameters_list = []
total_examples = 0
for _, fit_res in results:
client_id = str(fit_res.metrics['cid'])
parameters = ts.ckks_tensor_from(self.context, fit_res.metrics['he'])
print(f"Parametros chegado pelo servidor {parameters}")
parameters_list.append((parameters, int(fit_res.num_examples)))
total_examples += int(fit_res.num_examples)
for parameters, num_examples in parameters_list:
weights = num_examples / total_examples
agg_parameters = agg_parameters + (parameters * weights)
self.agg_parameters = agg_parameters.serialize()
return [], {}
# def aggregate(self, results):
# """Compute weighted average."""
# # Calculate the total number of examples used during training
# num_examples_total = sum([num_examples for _, num_examples in results])
# # Precompute the multiplicative inverse of num_examples_total
# inverse_num_examples_total = 1.0 / num_examples_total
# # Create a list of weights, each multiplied by the related number of examples
# weighted_weights = [
# [layer * num_examples for layer in weights] for weights, num_examples in results
# ]
# # Compute average weights of each layer using multiplication instead of division
# weights_prime = [
# reduce(np.add, layer_updates) * inverse_num_examples_total
# for layer_updates in zip(*weighted_weights)
# ]
# return weights_prime
def aggregate_evaluate(self, server_round, results, failures):
"""Aggregate evaluation losses using weighted average."""
if not results:
return None, {}
# Do not aggregate if there are failures and failures are not accepted
if not self.accept_failures and failures:
return None, {}
accuracies = []
for _, response in results:
print(response)
acc = response.metrics['accuracy']
accuracies.append(acc)
loss_aggregated = weighted_loss_avg(
[
(evaluate_res.num_examples, evaluate_res.loss)
for _, evaluate_res in results
]
)
print(f"Round {server_round} aggregated loss: {loss_aggregated} aggregated accuracy: {sum(accuracies)/len(accuracies)}")
with open(f'{self.log_folder}/server_evaluate.csv', 'a') as f:
f.write(f"{sum(accuracies)/len(accuracies)},{loss_aggregated}\n")
return loss_aggregated, {}
def configure_evaluate(self, server_round, parameters, client_manager):
"""Configure the next round of evaluation."""
if self.fraction_evaluate == 0.0:
return []
# Parameters and config
config = {
'he': self.agg_parameters,
} # {"server_round": server_round, "local_epochs": 1}
evaluate_ins = EvaluateIns(parameters, config)
# Sample clients
sample_size, min_num_clients = self.num_evaluation_clients(
client_manager.num_available()
)
clients = client_manager.sample(
num_clients=sample_size, min_num_clients=min_num_clients
)
# Return client/config pairs
# Each pair of (ClientProxy, FitRes) constitutes a successful update from one of the previously selected clients
return [(client, evaluate_ins) for client in clients]
Configurando Simulação
NIID = False
dataset = "mnist"
num_client= 2
direchlet_alpha = 0.7
direchlet_alpha_server = 0.2
fraction_fit = 1
NROUNDS = 10
def create_client(cid):
client = HEClient(
cid = cid,
niid = NIID,
dataset = dataset,
num_clients = num_client,
dirichlet_alpha = direchlet_alpha
)
return client.to_client()
!rm -rf logs
/usr/local/lib/python3.10/dist-packages/ipykernel/ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
and should_run_async(code)
#logs folder
if not os.path.exists("logs"):
os.makedirs("logs")
class Simulation():
def __init__(self):
self.server = HEServer(num_clients = num_client,
dataset = dataset,
fraction_fit = fraction_fit
)
def run_simulation(self):
fl.simulation.start_simulation(
client_fn = create_client,
num_clients = num_client,
config = fl.server.ServerConfig(num_rounds=NROUNDS),
strategy = self.server)
Simulation().run_simulation()
import pandas as pd
import matplotlib.pyplot as plt
LOG_DIR="logs"
path_img="img"
TOTAL_USERS=2
def generate_client_train(cid,path):
df = pd.read_csv(f"{path}/client_{cid}_train.csv")
fig, ((ax1,ax2),(ax3,ax4)) = plt.subplots(nrows=2, ncols=2,figsize=(10, 6))
ax1.plot(df.iloc[:,0], label='Acurácia', marker='o', color='b')
ax1.set_title('Acurácia ao longo das Iterações')
ax1.set_xlabel('Iterações')
ax1.set_ylabel('Acurácia')
ax1.grid(True)
ax1.legend()
# Subplot para Loss
ax2.plot(df.iloc[:,1], label='Loss', marker='x', color='r')
ax2.set_title('Loss ao longo das Iterações')
ax2.set_xlabel('Iterações')
ax2.set_ylabel('Loss')
ax2.grid(True)
ax2.legend()
ax3.plot(df.iloc[:,2], label='Acurácia', marker='o', color='b')
ax3.set_title('Acurácia ao longo das Iterações')
ax3.set_xlabel('Iterações')
ax3.set_ylabel('Tamanho vetor não criptografado')
ax3.grid(True)
ax3.legend()
# Subplot para Loss
ax4.plot(df.iloc[:,3], label='Loss', marker='x', color='r')
ax4.set_title('Loss ao longo das Iterações')
ax4.set_xlabel('Iterações')
ax4.set_ylabel('Tamanho vetor criptografado')
ax4.grid(True)
ax4.legend()
plt.savefig(f"{path_img}/client_{cid}_train.png")
plt.close()
def generate_client_eval(cid,path):
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 12))
df = pd.read_csv(f"{path}/client_{cid}_eval.csv")
# Subplot para Acurácia
ax1.plot(df.iloc[:,0], label='Acurácia', marker='o', color='b')
ax1.set_title('Acurácia ao longo das Iterações')
ax1.set_xlabel('Iterações')
ax1.set_ylabel('Acurácia')
ax1.grid(True)
ax1.legend()
# Subplot para Loss
ax2.plot(df.iloc[:,1], label='Loss', marker='x', color='r')
ax2.set_title('Loss ao longo das Iterações')
ax2.set_xlabel('Iterações')
ax2.set_ylabel('Loss')
ax2.grid(True)
ax2.legend()
# Ajustar o layout para que os gráficos não se sobreponham
plt.tight_layout()
# Mostrar o gráfico
plt.savefig(f"{path_img}/client_{cid}_eval.png")
plt.close()
#
def generate_server_eval(path):
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 12))
df = pd.read_csv(f"{path}/server_evaluate.csv")
# Subplot para Acurácia
ax1.plot(df.iloc[:,0], label='Acurácia', marker='o', color='b')
ax1.set_title('Acurácia ao longo das Iterações')
ax1.set_xlabel('Iterações')
ax1.set_ylabel('Acurácia')
ax1.grid(True)
ax1.legend()
# Subplot para Loss
ax2.plot(df.iloc[:,1], label='Loss', marker='x', color='r')
ax2.set_title('Loss ao longo das Iterações')
ax2.set_xlabel('Iterações')
ax2.set_ylabel('Loss')
ax2.grid(True)
ax2.legend()
# Ajustar o layout para que os gráficos não se sobreponham
plt.tight_layout()
# Mostrar o gráfico
plt.savefig(f"{path_img}/server_eval.png")
plt.close()
if not os.path.exists('img'):
os.makedirs('img')
if not os.path.exists('img'):
os.makedirs("img")
generate_server_eval(LOG_DIR)
for c in range(TOTAL_USERS):
generate_client_eval(c,LOG_DIR)
generate_client_train(c,LOG_DIR)