Neuroevolución para machine learning

Neuroevolución para machine learning

EvoX proporciona soluciones para tareas de aprendizaje supervisado basadas en neuroevolución, con módulos clave que incluyen SupervisedLearningProblem y ParamsAndVector. Tomando la tarea de clasificación MNIST como ejemplo, esta sección ilustra el proceso de neuroevolución para el aprendizaje supervisado adoptando los módulos de EvoX.

Configuración básica

Las importaciones de componentes básicos y la configuración del dispositivo sirven como los pasos iniciales esenciales para el proceso de neuroevolución.

Aquí, para asegurar la reproducibilidad de los resultados, se puede establecer opcionalmente una semilla aleatoria.

import torch
import torch.nn as nn

from evox.utils import ParamsAndVector
from evox.core import Algorithm, Mutable, Parameter, jit_class
from evox.problems.neuroevolution.supervised_learning import SupervisedLearningProblem
from evox.algorithms import PSO
from evox.workflows import EvalMonitor, StdWorkflow


# Set device
device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Set random seed
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

En este paso, se define un modelo de red neuronal convolucional (CNN) de muestra directamente sobre el framework PyTorch y luego se carga en el dispositivo.

class SampleCNN(nn.Module):
    def __init__(self):
        super(SampleCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 3, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(3, 3, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(3, 3, kernel_size=3),
            nn.ReLU(),
            nn.Conv2d(3, 3, kernel_size=3),
            nn.ReLU(),
        )
        self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(12, 10))

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


model = SampleCNN().to(device)

total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of model parameters: {total_params}")

Configurar el dataset implica la selección de la tarea. El cargador de datos ahora necesita ser inicializado basándose en el soporte integrado de PyTorch. Aquí, el paquete torchvision debe instalarse con antelación dependiendo de su versión de PyTorch, si aún no está disponible.

En caso de que el dataset MNIST no esté presente en el directorio data_root, se establece el flag download=True para asegurar que el dataset se descargue automáticamente. Por lo tanto, la configuración puede tomar algo de tiempo durante la primera ejecución.

import os
import torchvision


data_root = "./data"  # Choose a path to save dataset
os.makedirs(data_root, exist_ok=True)
train_dataset = torchvision.datasets.MNIST(
    root=data_root,
    train=True,
    download=True,
    transform=torchvision.transforms.ToTensor(),
)
test_dataset = torchvision.datasets.MNIST(
    root=data_root,
    train=False,
    download=True,
    transform=torchvision.transforms.ToTensor(),
)


BATCH_SIZE = 100
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=None,
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=None,
)

Para acelerar los procesos posteriores, todos los datos de MNIST se precargan para una ejecución más rápida. A continuación, se precargan tres datasets para diferentes etapas – entrenamiento por descenso de gradiente, ajuste fino por neuroevolución y prueba del modelo.

Cabe señalar que esta es una operación opcional que intercambia espacio por tiempo. Su adopción depende de la capacidad de su GPU, y siempre tomará algo de tiempo prepararla.

# Used for gradient descent training process
pre_gd_train_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in train_loader])

# Used for neuroevolution fine-tuning process
pre_ne_train_loader = tuple(
    [
        (
            inputs.to(device),
            labels.type(torch.float).unsqueeze(1).repeat(1, 10).to(device),
        )
        for inputs, labels in train_loader
    ]
)

# Used for model testing process
pre_test_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in test_loader])

Aquí, se predefine una función model_test para simplificar la evaluación de la precisión de predicción del modelo en el dataset de prueba durante las etapas posteriores.

def model_test(model: nn.Module, data_loader: torch.utils.data.DataLoader, device: torch.device) -> float:
    model.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        for inputs, labels in data_loader:
            inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
            labels: torch.Tensor = labels.to(device=device, non_blocking=True)

            logits = model(inputs)
            _, predicted = torch.max(logits.data, dim=1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        acc = 100 * correct / total
    return acc

Entrenamiento por descenso de gradiente (opcional)

Primero se realiza el entrenamiento del modelo basado en descenso de gradiente. En este ejemplo, este entrenamiento se adopta para inicializar el modelo, preparándolo para los procesos de neuroevolución posteriores.

El proceso de entrenamiento del modelo en PyTorch es compatible con la neuroevolución en EvoX, lo que hace conveniente reutilizar la misma implementación del modelo para los pasos siguientes.

def model_train(
    model: nn.Module,
    data_loader: torch.utils.data.DataLoader,
    criterion: nn.Module,
    optimizer: torch.optim.Optimizer,
    max_epoch: int,
    device: torch.device,
    print_frequent: int = -1,
) -> nn.Module:
    model.train()
    for epoch in range(max_epoch):
        running_loss = 0.0
        for step, (inputs, labels) in enumerate(data_loader, start=1):
            inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
            labels: torch.Tensor = labels.to(device=device, non_blocking=True)

            optimizer.zero_grad()
            logits = model(inputs)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if print_frequent > 0 and step % print_frequent == 0:
                print(f"[Epoch {epoch:2d}, step {step:4d}] running loss: {running_loss:.4f} ")
                running_loss = 0.0
    return model
model_train(
    model,
    data_loader=pre_gd_train_loader,
    criterion=nn.CrossEntropyLoss(),
    optimizer=torch.optim.Adam(model.parameters(), lr=1e-2),
    max_epoch=3,
    device=device,
    print_frequent=500,
)

gd_acc = model_test(model, pre_test_loader, device)
print(f"Accuracy after gradient descent training: {gd_acc:.4f} %.")

Ajuste fino por neuroevolución

Basándose en el modelo pre-entrenado del proceso de descenso de gradiente anterior, se aplica progresivamente la neuroevolución para ajustar el modelo.

Primero, se utiliza el componente ParamsAndVector para aplanar los pesos del modelo pre-entrenado en un vector, que sirve como el individuo central inicial para el proceso de neuroevolución posterior.

adapter = ParamsAndVector(dummy_model=model)
model_params = dict(model.named_parameters())
pop_center = adapter.to_vector(model_params)
lower_bound = pop_center - 0.01
upper_bound = pop_center + 0.01

En el caso de algoritmos diseñados específicamente para neuroevolución, que pueden aceptar directamente un diccionario de parámetros por lotes como entrada, el uso de ParamsAndVector puede ser innecesario.

Adicionalmente, se define un criterio de muestra. Aquí, tanto la pérdida como la precisión del modelo individual se seleccionan y ponderan para servir como la función de aptitud (fitness) en el proceso de neuroevolución. Este paso es personalizable para adaptarse a la dirección de optimización.

class AccuracyCriterion(nn.Module):
    def __init__(self, data_loader):
        super().__init__()
        data_loader = data_loader

    def forward(self, logits, labels):
        _, predicted = torch.max(logits, dim=1)
        correct = (predicted == labels[:, 0]).sum()
        fitness = -correct
        return fitness


acc_criterion = AccuracyCriterion(pre_ne_train_loader)
loss_criterion = nn.MSELoss()


class WeightedCriterion(nn.Module):
    def __init__(self, loss_weight, loss_criterion, acc_weight, acc_criterion):
        super().__init__()
        self.loss_weight = loss_weight
        self.loss_criterion = loss_criterion
        self.acc_weight = acc_weight
        self.acc_criterion = acc_criterion

    def forward(self, logits, labels):
        weighted_loss = self.loss_weight * loss_criterion(logits, labels)
        weighted_acc = self.acc_weight * acc_criterion(logits, labels)
        return weighted_loss + weighted_acc


weighted_criterion = WeightedCriterion(
    loss_weight=0.5,
    loss_criterion=loss_criterion,
    acc_weight=0.5,
    acc_criterion=acc_criterion,
)

Al mismo tiempo, de manera similar a los procesos de entrenamiento por descenso de gradiente y prueba del modelo, el proceso de ajuste fino por neuroevolución también se encapsula en una función para su uso conveniente en etapas posteriores.

import time


def neuroevolution_process(
    workflow: StdWorkflow,
    adapter: ParamsAndVector,
    model: nn.Module,
    test_loader: torch.utils.data.DataLoader,
    device: torch.device,
    best_acc: float,
    max_generation: int = 2,
) -> None:
    for index in range(max_generation):
        print(f"In generation {index}:")
        t = time.time()
        workflow.step()
        print(f"\tTime elapsed: {time.time() - t: .4f}(s).")

        monitor = workflow.get_submodule("monitor")
        print(f"\tTop fitness: {monitor.topk_fitness}")
        best_params = adapter.to_params(monitor.topk_solutions[0])
        model.load_state_dict(best_params)
        acc = model_test(model, test_loader, device)
        if acc > best_acc:
            best_acc = acc
        print(f"\tBest accuracy: {best_acc:.4f} %.")

Prueba de neuroevolución basada en población

En este ejemplo, primero se prueba el algoritmo basado en población para la neuroevolución, utilizando la Optimización por Enjambre de Partículas (PSO) como representación. La configuración para la neuroevolución es similar a la de otras tareas de optimización – necesitamos definir el problema, el algoritmo, el monitor y el flujo de trabajo (workflow), junto con sus respectivas funciones setup() para completar la inicialización.

Un punto clave a notar aquí es que el tamaño de la población (POP_SIZE en este caso) necesita ser inicializado tanto en el problema como en el algoritmo para evitar posibles errores.

POP_SIZE = 100
vmapped_problem = SupervisedLearningProblem(
    model=model,
    data_loader=pre_ne_train_loader,
    criterion=weighted_criterion,
    pop_size=POP_SIZE,
    device=device,
)
vmapped_problem.setup()

pop_algorithm = PSO(
    pop_size=POP_SIZE,
    lb=lower_bound,
    ub=upper_bound,
    device=device,
)
pop_algorithm.setup()

monitor = EvalMonitor(
    topk=3,
    device=device,
)
monitor.setup()

pop_workflow = StdWorkflow()
pop_workflow.setup(
    algorithm=pop_algorithm,
    problem=vmapped_problem,
    solution_transform=adapter,
    monitor=monitor,
    device=device,
)
print("Upon gradient descent, the population-based neuroevolution process start. ")
neuroevolution_process(
    workflow=pop_workflow,
    adapter=adapter,
    model=model,
    test_loader=pre_test_loader,
    device=device,
    best_acc=gd_acc,
    max_generation=10,
)
pop_workflow.get_submodule("monitor").plot()

Prueba de neuroevolución de un solo individuo

A continuación, se prueba la neuroevolución basada en un algoritmo de un solo individuo. De manera similar al caso basado en población, necesitamos definir el problema, el algoritmo, el monitor y el flujo de trabajo, y llamar a sus respectivas funciones setup() durante la inicialización. En este caso, se selecciona una estrategia de búsqueda aleatoria como algoritmo.

Un punto clave a notar aquí es que SupervisedLearningProblem debe configurarse con pop_size=None, y EvalMonitor debe tener topk=1, ya que solo se está buscando un único individuo. Una configuración cuidadosa de los hiperparámetros ayuda a evitar problemas innecesarios.

single_problem = SupervisedLearningProblem(
    model=model,
    data_loader=pre_ne_train_loader,
    criterion=weighted_criterion,
    pop_size=None,
    device=device,
)
single_problem.setup()


@jit_class
class RandAlgorithm(Algorithm):
    def __init__(self, lb, ub):
        super().__init__()
        assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}. "
        assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}. "
        self.hp = Parameter([1.0, 2.0])
        self.lb = lb
        self.ub = ub
        self.dim = lb.shape[0]
        self.pop = Mutable(torch.empty(1, lb.shape[0], dtype=lb.dtype, device=lb.device))
        self.fit = Mutable(torch.empty(1, dtype=lb.dtype, device=lb.device))

    def step(self):
        pop = torch.rand(
            self.dim,
            dtype=self.lb.dtype,
            device=self.lb.device,
        )
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        pop = pop * self.hp[0]
        self.pop.copy_(pop)
        self.fit.copy_(self.evaluate(pop))


single_algorithm = RandAlgorithm(lb=lower_bound, ub=upper_bound)

single_monitor = EvalMonitor(
    topk=1,
    device=device,
)
single_monitor.setup()

single_workflow = StdWorkflow()
single_workflow.setup(
    algorithm=single_algorithm,
    problem=single_problem,
    solution_transform=adapter,
    monitor=single_monitor,
    device=device,
)
print("Upon gradient descent, the single-individual neuroevolution process start. ")
neuroevolution_process(
    workflow=single_workflow,
    adapter=adapter,
    model=model,
    test_loader=pre_test_loader,
    device=device,
    best_acc=gd_acc,
    max_generation=12,
)
single_workflow.get_submodule("monitor").plot()