Neuroévolution pour l’apprentissage automatique
EvoX fournit des solutions pour les tâches d’apprentissage supervisé basées sur la neuroévolution, avec des modules clés incluant SupervisedLearningProblem et ParamsAndVector. En prenant la tâche de classification MNIST comme exemple, cette section illustre le processus de neuroévolution pour l’apprentissage supervisé en adoptant les modules d’EvoX.
Configuration de base
L’importation des composants de base et la configuration du périphérique (device) constituent les étapes de départ essentielles du processus de neuroévolution.
Ici, pour assurer la reproductibilité des résultats, une graine aléatoire (random seed) peut être définie optionnellement.
import torch
import torch.nn as nn
from evox.utils import ParamsAndVector
from evox.core import Algorithm, Mutable, Parameter, jit_class
from evox.problems.neuroevolution.supervised_learning import SupervisedLearningProblem
from evox.algorithms import PSO
from evox.workflows import EvalMonitor, StdWorkflow
# Set device
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# Set random seed
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
À cette étape, un modèle d’exemple de réseau de neurones convolutifs (CNN) est directement défini sur le framework PyTorch, puis chargé sur le périphérique.
class SampleCNN(nn.Module):
def __init__(self):
super(SampleCNN, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 3, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
)
self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(12, 10))
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = SampleCNN().to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of model parameters: {total_params}")
La configuration du jeu de données implique la sélection de la tâche. Le chargeur de données (data loader) doit maintenant être initialisé en se basant sur le support intégré de PyTorch.
Ici, le paquet torchvision doit être installé à l’avance selon votre version de PyTorch, s’il n’est pas déjà disponible.
Dans le cas où le jeu de données MNIST n’est pas déjà présent dans le répertoire data_root, le drapeau download=True est défini pour garantir que le jeu de données sera téléchargé automatiquement. Par conséquent, la configuration peut prendre un certain temps lors de la première exécution.
import os
import torchvision
data_root = "./data" # Choose a path to save dataset
os.makedirs(data_root, exist_ok=True)
train_dataset = torchvision.datasets.MNIST(
root=data_root,
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
test_dataset = torchvision.datasets.MNIST(
root=data_root,
train=False,
download=True,
transform=torchvision.transforms.ToTensor(),
)
BATCH_SIZE = 100
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=None,
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=None,
)
Pour accélérer les processus ultérieurs, toutes les données MNIST sont préchargées pour une exécution plus rapide. Ci-dessous, trois jeux de données sont préchargés pour différentes étapes – l’entraînement par descente de gradient, le réglage fin (fine-tuning) par neuroévolution et le test du modèle.
Il convient de noter qu’il s’agit d’une opération optionnelle qui échange de l’espace mémoire contre du temps. Son adoption dépend de la capacité de votre GPU, et la préparation prendra toujours un certain temps.
# Used for gradient descent training process
pre_gd_train_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in train_loader])
# Used for neuroevolution fine-tuning process
pre_ne_train_loader = tuple(
[
(
inputs.to(device),
labels.type(torch.float).unsqueeze(1).repeat(1, 10).to(device),
)
for inputs, labels in train_loader
]
)
# Used for model testing process
pre_test_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in test_loader])
Ici, une fonction model_test est prédéfinie pour simplifier l’évaluation de la précision de prédiction du modèle sur le jeu de données de test au cours des étapes ultérieures.
def model_test(model: nn.Module, data_loader: torch.utils.data.DataLoader, device: torch.device) -> float:
model.eval()
with torch.no_grad():
total = 0
correct = 0
for inputs, labels in data_loader:
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
logits = model(inputs)
_, predicted = torch.max(logits.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
return acc
Entraînement par descente de gradient (Optionnel)
L’entraînement du modèle basé sur la descente de gradient est effectué en premier. Dans cet exemple, cet entraînement est adopté pour initialiser le modèle, le préparant ainsi aux processus de neuroévolution ultérieurs.
Le processus d’entraînement du modèle dans PyTorch est compatible avec la neuroévolution dans EvoX, ce qui rend pratique la réutilisation de la même implémentation de modèle pour les étapes suivantes.
def model_train(
model: nn.Module,
data_loader: torch.utils.data.DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
max_epoch: int,
device: torch.device,
print_frequent: int = -1,
) -> nn.Module:
model.train()
for epoch in range(max_epoch):
running_loss = 0.0
for step, (inputs, labels) in enumerate(data_loader, start=1):
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
optimizer.zero_grad()
logits = model(inputs)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if print_frequent > 0 and step % print_frequent == 0:
print(f"[Epoch {epoch:2d}, step {step:4d}] running loss: {running_loss:.4f} ")
running_loss = 0.0
return model
model_train(
model,
data_loader=pre_gd_train_loader,
criterion=nn.CrossEntropyLoss(),
optimizer=torch.optim.Adam(model.parameters(), lr=1e-2),
max_epoch=3,
device=device,
print_frequent=500,
)
gd_acc = model_test(model, pre_test_loader, device)
print(f"Accuracy after gradient descent training: {gd_acc:.4f} %.")
Réglage fin par neuroévolution
Sur la base du modèle pré-entraîné issu du processus précédent de descente de gradient, la neuroévolution est appliquée progressivement pour affiner le modèle.
Tout d’abord, le composant ParamsAndVector est utilisé pour aplatir les poids du modèle pré-entraîné en un vecteur, qui sert d’individu central initial pour le processus de neuroévolution ultérieur.
adapter = ParamsAndVector(dummy_model=model)
model_params = dict(model.named_parameters())
pop_center = adapter.to_vector(model_params)
lower_bound = pop_center - 0.01
upper_bound = pop_center + 0.01
Dans le cas d’algorithmes spécifiquement conçus pour la neuroévolution, qui peuvent accepter directement un dictionnaire de paramètres par lots en entrée, l’utilisation de
ParamsAndVectorpeut être inutile.
De plus, un critère d’exemple est défini. Ici, la perte (loss) et la précision (accuracy) du modèle individuel sont toutes deux sélectionnées et pondérées pour servir de fonction de fitness dans le processus de neuroévolution. Cette étape est personnalisable pour s’adapter à la direction de l’optimisation.
class AccuracyCriterion(nn.Module):
def __init__(self, data_loader):
super().__init__()
data_loader = data_loader
def forward(self, logits, labels):
_, predicted = torch.max(logits, dim=1)
correct = (predicted == labels[:, 0]).sum()
fitness = -correct
return fitness
acc_criterion = AccuracyCriterion(pre_ne_train_loader)
loss_criterion = nn.MSELoss()
class WeightedCriterion(nn.Module):
def __init__(self, loss_weight, loss_criterion, acc_weight, acc_criterion):
super().__init__()
self.loss_weight = loss_weight
self.loss_criterion = loss_criterion
self.acc_weight = acc_weight
self.acc_criterion = acc_criterion
def forward(self, logits, labels):
weighted_loss = self.loss_weight * loss_criterion(logits, labels)
weighted_acc = self.acc_weight * acc_criterion(logits, labels)
return weighted_loss + weighted_acc
weighted_criterion = WeightedCriterion(
loss_weight=0.5,
loss_criterion=loss_criterion,
acc_weight=0.5,
acc_criterion=acc_criterion,
)
En même temps, de manière similaire aux processus d’entraînement par descente de gradient et de test du modèle, le processus de réglage fin par neuroévolution est également encapsulé dans une fonction pour une utilisation pratique dans les étapes ultérieures.
import time
def neuroevolution_process(
workflow: StdWorkflow,
adapter: ParamsAndVector,
model: nn.Module,
test_loader: torch.utils.data.DataLoader,
device: torch.device,
best_acc: float,
max_generation: int = 2,
) -> None:
for index in range(max_generation):
print(f"In generation {index}:")
t = time.time()
workflow.step()
print(f"\tTime elapsed: {time.time() - t: .4f}(s).")
monitor = workflow.get_submodule("monitor")
print(f"\tTop fitness: {monitor.topk_fitness}")
best_params = adapter.to_params(monitor.topk_solutions[0])
model.load_state_dict(best_params)
acc = model_test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
print(f"\tBest accuracy: {best_acc:.4f} %.")
Test de neuroévolution basé sur une population
Dans cet exemple, l’algorithme basé sur une population pour la neuroévolution est testé en premier, en utilisant l’optimisation par essaim particulaire (PSO) comme représentation. La configuration pour la neuroévolution est similaire à celle d’autres tâches d’optimisation – nous devons définir le problème, l’algorithme, le moniteur et le flux de travail (workflow), ainsi que leurs fonctions setup() respectives pour compléter l’initialisation.
Un point clé à noter ici est que la taille de la population (POP_SIZE dans ce cas) doit être initialisée à la fois dans le problème et dans l’algorithme pour éviter des erreurs potentielles.
POP_SIZE = 100
vmapped_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=POP_SIZE,
device=device,
)
vmapped_problem.setup()
pop_algorithm = PSO(
pop_size=POP_SIZE,
lb=lower_bound,
ub=upper_bound,
device=device,
)
pop_algorithm.setup()
monitor = EvalMonitor(
topk=3,
device=device,
)
monitor.setup()
pop_workflow = StdWorkflow()
pop_workflow.setup(
algorithm=pop_algorithm,
problem=vmapped_problem,
solution_transform=adapter,
monitor=monitor,
device=device,
)
print("Upon gradient descent, the population-based neuroevolution process start. ")
neuroevolution_process(
workflow=pop_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=10,
)
pop_workflow.get_submodule("monitor").plot()
Test de neuroévolution à individu unique
Ensuite, la neuroévolution basée sur un algorithme à individu unique est testée. Comme pour le cas basé sur une population, nous devons définir le problème, l’algorithme, le moniteur et le flux de travail, et appeler leurs fonctions setup() respectives lors de l’initialisation. Dans ce cas, une stratégie de recherche aléatoire est sélectionnée comme algorithme.
Un point clé à noter ici est que SupervisedLearningProblem doit être configuré avec pop_size=None, et EvalMonitor doit avoir topk=1, car un seul individu est recherché. Une configuration minutieuse des hyper-paramètres permet d’éviter des problèmes inutiles.
single_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=None,
device=device,
)
single_problem.setup()
@jit_class
class RandAlgorithm(Algorithm):
def __init__(self, lb, ub):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}. "
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}. "
self.hp = Parameter([1.0, 2.0])
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(1, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(1, dtype=lb.dtype, device=lb.device))
def step(self):
pop = torch.rand(
self.dim,
dtype=self.lb.dtype,
device=self.lb.device,
)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
self.pop.copy_(pop)
self.fit.copy_(self.evaluate(pop))
single_algorithm = RandAlgorithm(lb=lower_bound, ub=upper_bound)
single_monitor = EvalMonitor(
topk=1,
device=device,
)
single_monitor.setup()
single_workflow = StdWorkflow()
single_workflow.setup(
algorithm=single_algorithm,
problem=single_problem,
solution_transform=adapter,
monitor=single_monitor,
device=device,
)
print("Upon gradient descent, the single-individual neuroevolution process start. ")
neuroevolution_process(
workflow=single_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=12,
)
single_workflow.get_submodule("monitor").plot()