Neuroévolution pour l’apprentissage automatique
EvoX fournit des solutions pour les tâches d’apprentissage supervisé basées sur la neuroévolution, avec des modules clés incluant SupervisedLearningProblem et ParamsAndVector. En prenant la tâche de classification MNIST comme exemple, cette section illustre le processus de neuroévolution pour l’apprentissage supervisé en adoptant les modules d’EvoX.
Configuration de base
Les importations de composants de base et la configuration de l’appareil servent d’étapes de démarrage essentielles pour le processus de neuroévolution.
Ici, pour assurer la reproductibilité des résultats, une graine aléatoire peut être optionnellement définie.
import torch
import torch.nn as nn
from evox.utils import ParamsAndVector
from evox.core import Algorithm, Mutable, Parameter, jit_class
from evox.problems.neuroevolution.supervised_learning import SupervisedLearningProblem
from evox.algorithms import PSO
from evox.workflows import EvalMonitor, StdWorkflow
# Set device
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# Set random seed
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
Dans cette étape, un exemple de modèle de réseau de neurones convolutif (CNN) est directement défini sur le framework PyTorch puis chargé sur l’appareil.
class SampleCNN(nn.Module):
def __init__(self):
super(SampleCNN, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 3, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
)
self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(12, 10))
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = SampleCNN().to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of model parameters: {total_params}")
La configuration du jeu de données implique la sélection de la tâche. Le chargeur de données doit maintenant être initialisé en se basant sur le support intégré de PyTorch.
Ici, le paquet torchvision doit être installé au préalable en fonction de votre version de PyTorch, s’il n’est pas déjà disponible.
Si le jeu de données MNIST n’est pas déjà présent dans le répertoire data_root, le drapeau download=True est défini pour s’assurer que le jeu de données sera automatiquement téléchargé. Par conséquent, la configuration peut prendre un certain temps lors de la première exécution.
import os
import torchvision
data_root = "./data" # Choose a path to save dataset
os.makedirs(data_root, exist_ok=True)
train_dataset = torchvision.datasets.MNIST(
root=data_root,
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
test_dataset = torchvision.datasets.MNIST(
root=data_root,
train=False,
download=True,
transform=torchvision.transforms.ToTensor(),
)
BATCH_SIZE = 100
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=None,
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=None,
)
Pour accélérer les processus suivants, toutes les données MNIST sont pré-chargées pour une exécution plus rapide. Ci-dessous, trois jeux de données sont pré-chargés pour différentes étapes — entraînement par descente de gradient, affinage par neuroévolution et test du modèle.
Il convient de noter qu’il s’agit d’une opération optionnelle qui échange de l’espace contre du temps. Son adoption dépend de la capacité de votre GPU, et elle prendra toujours un certain temps de préparation.
# Used for gradient descent training process
pre_gd_train_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in train_loader])
# Used for neuroevolution fine-tuning process
pre_ne_train_loader = tuple(
[
(
inputs.to(device),
labels.type(torch.float).unsqueeze(1).repeat(1, 10).to(device),
)
for inputs, labels in train_loader
]
)
# Used for model testing process
pre_test_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in test_loader])
Ici, une fonction model_test est pré-définie pour simplifier l’évaluation de la précision de prédiction du modèle sur le jeu de données de test lors des étapes suivantes.
def model_test(model: nn.Module, data_loader: torch.utils.data.DataLoader, device: torch.device) -> float:
model.eval()
with torch.no_grad():
total = 0
correct = 0
for inputs, labels in data_loader:
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
logits = model(inputs)
_, predicted = torch.max(logits.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
return acc
Entraînement par descente de gradient (optionnel)
L’entraînement du modèle basé sur la descente de gradient est effectué en premier. Dans cet exemple, cet entraînement est adopté pour initialiser le modèle, le préparant pour les processus de neuroévolution suivants.
Le processus d’entraînement du modèle dans PyTorch est compatible avec la neuroévolution dans EvoX, ce qui facilite la réutilisation de la même implémentation de modèle pour les étapes suivantes.
def model_train(
model: nn.Module,
data_loader: torch.utils.data.DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
max_epoch: int,
device: torch.device,
print_frequent: int = -1,
) -> nn.Module:
model.train()
for epoch in range(max_epoch):
running_loss = 0.0
for step, (inputs, labels) in enumerate(data_loader, start=1):
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
optimizer.zero_grad()
logits = model(inputs)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if print_frequent > 0 and step % print_frequent == 0:
print(f"[Epoch {epoch:2d}, step {step:4d}] running loss: {running_loss:.4f} ")
running_loss = 0.0
return model
model_train(
model,
data_loader=pre_gd_train_loader,
criterion=nn.CrossEntropyLoss(),
optimizer=torch.optim.Adam(model.parameters(), lr=1e-2),
max_epoch=3,
device=device,
print_frequent=500,
)
gd_acc = model_test(model, pre_test_loader, device)
print(f"Accuracy after gradient descent training: {gd_acc:.4f} %.")
Affinage par neuroévolution
Basé sur le modèle pré-entraîné du processus de descente de gradient précédent, la neuroévolution est progressivement appliquée pour affiner le modèle.
D’abord, le composant ParamsAndVector est utilisé pour aplatir les poids du modèle pré-entraîné en un vecteur, qui sert d’individu central initial pour le processus de neuroévolution suivant.
adapter = ParamsAndVector(dummy_model=model)
model_params = dict(model.named_parameters())
pop_center = adapter.to_vector(model_params)
lower_bound = pop_center - 0.01
upper_bound = pop_center + 0.01
Dans le cas d’algorithmes spécifiquement conçus pour la neuroévolution, qui peuvent directement accepter un dictionnaire de paramètres par lots en entrée, l’utilisation de
ParamsAndVectorpeut être inutile.
De plus, un critère d’exemple est défini. Ici, la perte et la précision du modèle individuel sont sélectionnées et pondérées pour servir de fonction de fitness dans le processus de neuroévolution. Cette étape est personnalisable pour s’adapter à la direction d’optimisation.
class AccuracyCriterion(nn.Module):
def __init__(self, data_loader):
super().__init__()
data_loader = data_loader
def forward(self, logits, labels):
_, predicted = torch.max(logits, dim=1)
correct = (predicted == labels[:, 0]).sum()
fitness = -correct
return fitness
acc_criterion = AccuracyCriterion(pre_ne_train_loader)
loss_criterion = nn.MSELoss()
class WeightedCriterion(nn.Module):
def __init__(self, loss_weight, loss_criterion, acc_weight, acc_criterion):
super().__init__()
self.loss_weight = loss_weight
self.loss_criterion = loss_criterion
self.acc_weight = acc_weight
self.acc_criterion = acc_criterion
def forward(self, logits, labels):
weighted_loss = self.loss_weight * loss_criterion(logits, labels)
weighted_acc = self.acc_weight * acc_criterion(logits, labels)
return weighted_loss + weighted_acc
weighted_criterion = WeightedCriterion(
loss_weight=0.5,
loss_criterion=loss_criterion,
acc_weight=0.5,
acc_criterion=acc_criterion,
)
En même temps, de manière similaire aux processus d’entraînement par descente de gradient et de test du modèle, le processus d’affinage par neuroévolution est également encapsulé dans une fonction pour une utilisation pratique dans les étapes suivantes.
import time
def neuroevolution_process(
workflow: StdWorkflow,
adapter: ParamsAndVector,
model: nn.Module,
test_loader: torch.utils.data.DataLoader,
device: torch.device,
best_acc: float,
max_generation: int = 2,
) -> None:
for index in range(max_generation):
print(f"In generation {index}:")
t = time.time()
workflow.step()
print(f"\tTime elapsed: {time.time() - t: .4f}(s).")
monitor = workflow.get_submodule("monitor")
print(f"\tTop fitness: {monitor.topk_fitness}")
best_params = adapter.to_params(monitor.topk_solutions[0])
model.load_state_dict(best_params)
acc = model_test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
print(f"\tBest accuracy: {best_acc:.4f} %.")
Test de neuroévolution basé sur la population
Dans cet exemple, l’algorithme basé sur la population pour la neuroévolution est testé en premier, en utilisant l’Optimisation par Essaim de Particules (PSO) comme représentation. La configuration pour la neuroévolution est similaire à celle des autres tâches d’optimisation — nous devons définir le problème, l’algorithme, le moniteur et le workflow, ainsi que leurs fonctions setup() respectives pour compléter l’initialisation.
Un point clé à noter ici est que la taille de la population (POP_SIZE dans ce cas) doit être initialisée à la fois dans le problème et dans l’algorithme pour éviter les erreurs potentielles.
POP_SIZE = 100
vmapped_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=POP_SIZE,
device=device,
)
vmapped_problem.setup()
pop_algorithm = PSO(
pop_size=POP_SIZE,
lb=lower_bound,
ub=upper_bound,
device=device,
)
pop_algorithm.setup()
monitor = EvalMonitor(
topk=3,
device=device,
)
monitor.setup()
pop_workflow = StdWorkflow()
pop_workflow.setup(
algorithm=pop_algorithm,
problem=vmapped_problem,
solution_transform=adapter,
monitor=monitor,
device=device,
)
print("Upon gradient descent, the population-based neuroevolution process start. ")
neuroevolution_process(
workflow=pop_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=10,
)
pop_workflow.get_submodule("monitor").plot()
Test de neuroévolution à individu unique
Ensuite, la neuroévolution basée sur un algorithme à individu unique est testée. De manière similaire au cas basé sur la population, nous devons définir le problème, l’algorithme, le moniteur et le workflow, et appeler leurs fonctions setup() respectives lors de l’initialisation. Dans ce cas, une stratégie de recherche aléatoire est sélectionnée comme algorithme.
Un point clé à noter ici est que SupervisedLearningProblem doit être configuré avec pop_size=None, et EvalMonitor doit avoir topk=1, car un seul individu est recherché. Une configuration soigneuse des hyperparamètres aide à éviter les problèmes inutiles.
single_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=None,
device=device,
)
single_problem.setup()
@jit_class
class RandAlgorithm(Algorithm):
def __init__(self, lb, ub):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}. "
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}. "
self.hp = Parameter([1.0, 2.0])
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(1, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(1, dtype=lb.dtype, device=lb.device))
def step(self):
pop = torch.rand(
self.dim,
dtype=self.lb.dtype,
device=self.lb.device,
)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
self.pop.copy_(pop)
self.fit.copy_(self.evaluate(pop))
single_algorithm = RandAlgorithm(lb=lower_bound, ub=upper_bound)
single_monitor = EvalMonitor(
topk=1,
device=device,
)
single_monitor.setup()
single_workflow = StdWorkflow()
single_workflow.setup(
algorithm=single_algorithm,
problem=single_problem,
solution_transform=adapter,
monitor=single_monitor,
device=device,
)
print("Upon gradient descent, the single-individual neuroevolution process start. ")
neuroevolution_process(
workflow=single_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=12,
)
single_workflow.get_submodule("monitor").plot()