用于机器学习的神经进化
EvoX 提供了基于神经进化的监督学习任务解决方案,关键模块包括 SupervisedLearningProblem 和 ParamsAndVector。以 MNIST 分类任务为例,本节通过采用 EvoX 的模块来说明监督学习的神经进化过程。
基本设置
基本组件导入和设备配置是神经进化过程的必要起始步骤。
这里,为了确保结果的可重复性,可以选择性地设置随机种子。
import torch
import torch.nn as nn
from evox.utils import ParamsAndVector
from evox.core import Algorithm, Mutable, Parameter, jit_class
from evox.problems.neuroevolution.supervised_learning import SupervisedLearningProblem
from evox.algorithms import PSO
from evox.workflows import EvalMonitor, StdWorkflow
# Set device
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# Set random seed
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
在这一步中,直接基于 PyTorch 框架定义一个示例卷积神经网络(CNN)模型,然后加载到设备上。
class SampleCNN(nn.Module):
def __init__(self):
super(SampleCNN, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 3, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
)
self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(12, 10))
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = SampleCNN().to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of model parameters: {total_params}")
设置数据集意味着选择任务。现在需要基于 PyTorch 的内置支持初始化数据加载器。
这里,必须根据你的 PyTorch 版本预先安装 torchvision 包(如果尚未安装)。
如果 MNIST 数据集尚未存在于 data_root 目录中,download=True 标志将确保数据集自动下载。因此,首次运行时设置可能需要一些时间。
import os
import torchvision
data_root = "./data" # Choose a path to save dataset
os.makedirs(data_root, exist_ok=True)
train_dataset = torchvision.datasets.MNIST(
root=data_root,
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
test_dataset = torchvision.datasets.MNIST(
root=data_root,
train=False,
download=True,
transform=torchvision.transforms.ToTensor(),
)
BATCH_SIZE = 100
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=None,
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=None,
)
为了加速后续过程,所有 MNIST 数据都被预加载以实现更快的执行。下面,为不同阶段预加载了三个数据集——梯度下降训练、神经进化微调和模型测试。
需要注意的是,这是一个以空间换时间的可选操作。是否采用取决于你的 GPU 容量,并且准备过程总是需要一些时间。
# Used for gradient descent training process
pre_gd_train_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in train_loader])
# Used for neuroevolution fine-tuning process
pre_ne_train_loader = tuple(
[
(
inputs.to(device),
labels.type(torch.float).unsqueeze(1).repeat(1, 10).to(device),
)
for inputs, labels in train_loader
]
)
# Used for model testing process
pre_test_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in test_loader])
这里,预定义了一个 model_test 函数,以简化后续阶段中模型在测试数据集上的预测准确率评估。
def model_test(model: nn.Module, data_loader: torch.utils.data.DataLoader, device: torch.device) -> float:
model.eval()
with torch.no_grad():
total = 0
correct = 0
for inputs, labels in data_loader:
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
logits = model(inputs)
_, predicted = torch.max(logits.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
return acc
梯度下降训练(可选)
首先执行基于梯度下降的模型训练。在本示例中,此训练用于初始化模型,为后续的神经进化过程做准备。
PyTorch 中的模型训练过程与 EvoX 中的神经进化兼容,使得在后续步骤中复用相同的模型实现非常方便。
def model_train(
model: nn.Module,
data_loader: torch.utils.data.DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
max_epoch: int,
device: torch.device,
print_frequent: int = -1,
) -> nn.Module:
model.train()
for epoch in range(max_epoch):
running_loss = 0.0
for step, (inputs, labels) in enumerate(data_loader, start=1):
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
optimizer.zero_grad()
logits = model(inputs)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if print_frequent > 0 and step % print_frequent == 0:
print(f"[Epoch {epoch:2d}, step {step:4d}] running loss: {running_loss:.4f} ")
running_loss = 0.0
return model
model_train(
model,
data_loader=pre_gd_train_loader,
criterion=nn.CrossEntropyLoss(),
optimizer=torch.optim.Adam(model.parameters(), lr=1e-2),
max_epoch=3,
device=device,
print_frequent=500,
)
gd_acc = model_test(model, pre_test_loader, device)
print(f"Accuracy after gradient descent training: {gd_acc:.4f} %.")
神经进化微调
基于前一步梯度下降过程的预训练模型,逐步应用神经进化来微调模型。
首先,使用 ParamsAndVector 组件将预训练模型的权重展平为向量,作为后续神经进化过程的初始中心个体。
adapter = ParamsAndVector(dummy_model=model)
model_params = dict(model.named_parameters())
pop_center = adapter.to_vector(model_params)
lower_bound = pop_center - 0.01
upper_bound = pop_center + 0.01
对于专门为神经进化设计的算法(可以直接接受批量参数字典作为输入),使用
ParamsAndVector可能不是必需的。
此外,定义了一个示例评估准则。这里,选择并加权了个体模型的损失和准确率作为神经进化过程中的适应度函数。此步骤可根据优化方向进行自定义。
class AccuracyCriterion(nn.Module):
def __init__(self, data_loader):
super().__init__()
data_loader = data_loader
def forward(self, logits, labels):
_, predicted = torch.max(logits, dim=1)
correct = (predicted == labels[:, 0]).sum()
fitness = -correct
return fitness
acc_criterion = AccuracyCriterion(pre_ne_train_loader)
loss_criterion = nn.MSELoss()
class WeightedCriterion(nn.Module):
def __init__(self, loss_weight, loss_criterion, acc_weight, acc_criterion):
super().__init__()
self.loss_weight = loss_weight
self.loss_criterion = loss_criterion
self.acc_weight = acc_weight
self.acc_criterion = acc_criterion
def forward(self, logits, labels):
weighted_loss = self.loss_weight * loss_criterion(logits, labels)
weighted_acc = self.acc_weight * acc_criterion(logits, labels)
return weighted_loss + weighted_acc
weighted_criterion = WeightedCriterion(
loss_weight=0.5,
loss_criterion=loss_criterion,
acc_weight=0.5,
acc_criterion=acc_criterion,
)
同时,与梯度下降训练和模型测试过程类似,神经进化微调过程也被封装为函数,以便在后续阶段方便使用。
import time
def neuroevolution_process(
workflow: StdWorkflow,
adapter: ParamsAndVector,
model: nn.Module,
test_loader: torch.utils.data.DataLoader,
device: torch.device,
best_acc: float,
max_generation: int = 2,
) -> None:
for index in range(max_generation):
print(f"In generation {index}:")
t = time.time()
workflow.step()
print(f"\tTime elapsed: {time.time() - t: .4f}(s).")
monitor = workflow.get_submodule("monitor")
print(f"\tTop fitness: {monitor.topk_fitness}")
best_params = adapter.to_params(monitor.topk_solutions[0])
model.load_state_dict(best_params)
acc = model_test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
print(f"\tBest accuracy: {best_acc:.4f} %.")
基于种群的神经进化测试
在本示例中,首先测试基于种群的神经进化算法,使用粒子群优化(PSO)作为代表。神经进化的配置与其他优化任务类似——我们需要定义问题、算法、监视器和工作流,以及它们各自的 setup() 函数来完成初始化。
这里需要注意的关键点是,种群大小(本例中的 POP_SIZE)需要在问题和算法中都进行初始化,以避免潜在错误。
POP_SIZE = 100
vmapped_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=POP_SIZE,
device=device,
)
vmapped_problem.setup()
pop_algorithm = PSO(
pop_size=POP_SIZE,
lb=lower_bound,
ub=upper_bound,
device=device,
)
pop_algorithm.setup()
monitor = EvalMonitor(
topk=3,
device=device,
)
monitor.setup()
pop_workflow = StdWorkflow()
pop_workflow.setup(
algorithm=pop_algorithm,
problem=vmapped_problem,
solution_transform=adapter,
monitor=monitor,
device=device,
)
print("Upon gradient descent, the population-based neuroevolution process start. ")
neuroevolution_process(
workflow=pop_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=10,
)
pop_workflow.get_submodule("monitor").plot()
单个体神经进化测试
接下来,测试基于单个体算法的神经进化。与基于种群的情况类似,我们需要定义问题、算法、监视器和工作流,并在初始化时调用它们各自的 setup() 函数。在这种情况下,选择随机搜索策略作为算法。
这里需要注意的关键点是,SupervisedLearningProblem 应设置 pop_size=None,EvalMonitor 应设置 topk=1,因为只搜索单个个体。仔细的超参数设置有助于避免不必要的问题。
single_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=None,
device=device,
)
single_problem.setup()
@jit_class
class RandAlgorithm(Algorithm):
def __init__(self, lb, ub):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}. "
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}. "
self.hp = Parameter([1.0, 2.0])
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(1, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(1, dtype=lb.dtype, device=lb.device))
def step(self):
pop = torch.rand(
self.dim,
dtype=self.lb.dtype,
device=self.lb.device,
)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
self.pop.copy_(pop)
self.fit.copy_(self.evaluate(pop))
single_algorithm = RandAlgorithm(lb=lower_bound, ub=upper_bound)
single_monitor = EvalMonitor(
topk=1,
device=device,
)
single_monitor.setup()
single_workflow = StdWorkflow()
single_workflow.setup(
algorithm=single_algorithm,
problem=single_problem,
solution_transform=adapter,
monitor=single_monitor,
device=device,
)
print("Upon gradient descent, the single-individual neuroevolution process start. ")
neuroevolution_process(
workflow=single_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=12,
)
single_workflow.get_submodule("monitor").plot()