機器學習的神經演化
EvoX 提供了基於神經演化的監督式學習任務解決方案,關鍵模組包括 SupervisedLearningProblem 和 ParamsAndVector。以 MNIST 分類任務為例,本節透過採用 EvoX 的模組來說明監督式學習的神經演化過程。
基本設定
基本元件匯入和裝置配置是神經演化過程的基本起始步驟。
這裡,為了確保結果的可重現性,可以選擇性地設定隨機種子。
import torch
import torch.nn as nn
from evox.utils import ParamsAndVector
from evox.core import Algorithm, Mutable, Parameter, jit_class
from evox.problems.neuroevolution.supervised_learning import SupervisedLearningProblem
from evox.algorithms import PSO
from evox.workflows import EvalMonitor, StdWorkflow
# Set device
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# Set random seed
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
在此步驟中,直接在 PyTorch 框架上定義一個範例卷積神經網路(CNN)模型,然後載入到裝置上。
class SampleCNN(nn.Module):
def __init__(self):
super(SampleCNN, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 3, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
nn.Conv2d(3, 3, kernel_size=3),
nn.ReLU(),
)
self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(12, 10))
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = SampleCNN().to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of model parameters: {total_params}")
設定資料集意味著選擇任務。現在需要基於 PyTorch 的內建支援初始化資料載入器。
這裡,必須根據您的 PyTorch 版本預先安裝 torchvision 套件(如果尚未可用)。
如果 MNIST 資料集尚未存在於 data_root 目錄中,則設定 download=True 標誌以確保資料集將被自動下載。因此,首次執行時設定可能需要一些時間。
import os
import torchvision
data_root = "./data" # Choose a path to save dataset
os.makedirs(data_root, exist_ok=True)
train_dataset = torchvision.datasets.MNIST(
root=data_root,
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
test_dataset = torchvision.datasets.MNIST(
root=data_root,
train=False,
download=True,
transform=torchvision.transforms.ToTensor(),
)
BATCH_SIZE = 100
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
collate_fn=None,
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=None,
)
為了加速後續過程,所有 MNIST 資料都被預先載入以加快執行速度。以下為不同階段預先載入了三個資料集——梯度下降訓練、神經演化微調和模型測試。
需要注意的是,這是一個以空間換時間的可選操作。其採用取決於您的 GPU 容量,且準備過程總是需要一些時間。
# Used for gradient descent training process
pre_gd_train_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in train_loader])
# Used for neuroevolution fine-tuning process
pre_ne_train_loader = tuple(
[
(
inputs.to(device),
labels.type(torch.float).unsqueeze(1).repeat(1, 10).to(device),
)
for inputs, labels in train_loader
]
)
# Used for model testing process
pre_test_loader = tuple([(inputs.to(device), labels.to(device)) for inputs, labels in test_loader])
這裡,預先定義了一個 model_test 函數,以簡化後續階段中模型在測試資料集上的預測準確率評估。
def model_test(model: nn.Module, data_loader: torch.utils.data.DataLoader, device: torch.device) -> float:
model.eval()
with torch.no_grad():
total = 0
correct = 0
for inputs, labels in data_loader:
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
logits = model(inputs)
_, predicted = torch.max(logits.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
return acc
梯度下降訓練(可選)
首先執行基於梯度下降的模型訓練。在此範例中,此訓練用於初始化模型,為後續的神經演化過程做準備。
PyTorch 中的模型訓練過程與 EvoX 中的神經演化相容,使得在後續步驟中重用相同的模型實作變得方便。
def model_train(
model: nn.Module,
data_loader: torch.utils.data.DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
max_epoch: int,
device: torch.device,
print_frequent: int = -1,
) -> nn.Module:
model.train()
for epoch in range(max_epoch):
running_loss = 0.0
for step, (inputs, labels) in enumerate(data_loader, start=1):
inputs: torch.Tensor = inputs.to(device=device, non_blocking=True)
labels: torch.Tensor = labels.to(device=device, non_blocking=True)
optimizer.zero_grad()
logits = model(inputs)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if print_frequent > 0 and step % print_frequent == 0:
print(f"[Epoch {epoch:2d}, step {step:4d}] running loss: {running_loss:.4f} ")
running_loss = 0.0
return model
model_train(
model,
data_loader=pre_gd_train_loader,
criterion=nn.CrossEntropyLoss(),
optimizer=torch.optim.Adam(model.parameters(), lr=1e-2),
max_epoch=3,
device=device,
print_frequent=500,
)
gd_acc = model_test(model, pre_test_loader, device)
print(f"Accuracy after gradient descent training: {gd_acc:.4f} %.")
神經演化微調
基於前一個梯度下降過程的預訓練模型,逐步應用神經演化來微調模型。
首先,使用 ParamsAndVector 元件將預訓練模型的權重展平為向量,作為後續神經演化過程的初始中心個體。
adapter = ParamsAndVector(dummy_model=model)
model_params = dict(model.named_parameters())
pop_center = adapter.to_vector(model_params)
lower_bound = pop_center - 0.01
upper_bound = pop_center + 0.01
對於專門為神經演化設計的演算法(可以直接接受批次參數字典作為輸入),使用
ParamsAndVector可能不是必需的。
此外,定義了一個範例準則。這裡,選擇並加權了個體模型的損失和準確率作為神經演化過程中的適應度函數。此步驟可根據最佳化方向進行自訂。
class AccuracyCriterion(nn.Module):
def __init__(self, data_loader):
super().__init__()
data_loader = data_loader
def forward(self, logits, labels):
_, predicted = torch.max(logits, dim=1)
correct = (predicted == labels[:, 0]).sum()
fitness = -correct
return fitness
acc_criterion = AccuracyCriterion(pre_ne_train_loader)
loss_criterion = nn.MSELoss()
class WeightedCriterion(nn.Module):
def __init__(self, loss_weight, loss_criterion, acc_weight, acc_criterion):
super().__init__()
self.loss_weight = loss_weight
self.loss_criterion = loss_criterion
self.acc_weight = acc_weight
self.acc_criterion = acc_criterion
def forward(self, logits, labels):
weighted_loss = self.loss_weight * loss_criterion(logits, labels)
weighted_acc = self.acc_weight * acc_criterion(logits, labels)
return weighted_loss + weighted_acc
weighted_criterion = WeightedCriterion(
loss_weight=0.5,
loss_criterion=loss_criterion,
acc_weight=0.5,
acc_criterion=acc_criterion,
)
同時,與梯度下降訓練和模型測試過程類似,神經演化微調過程也被封裝成一個函數,以便在後續階段中方便使用。
import time
def neuroevolution_process(
workflow: StdWorkflow,
adapter: ParamsAndVector,
model: nn.Module,
test_loader: torch.utils.data.DataLoader,
device: torch.device,
best_acc: float,
max_generation: int = 2,
) -> None:
for index in range(max_generation):
print(f"In generation {index}:")
t = time.time()
workflow.step()
print(f"\tTime elapsed: {time.time() - t: .4f}(s).")
monitor = workflow.get_submodule("monitor")
print(f"\tTop fitness: {monitor.topk_fitness}")
best_params = adapter.to_params(monitor.topk_solutions[0])
model.load_state_dict(best_params)
acc = model_test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
print(f"\tBest accuracy: {best_acc:.4f} %.")
基於種群的神經演化測試
在此範例中,首先測試基於種群的神經演化演算法,使用粒子群最佳化(PSO)作為代表。神經演化的配置與其他最佳化任務類似——我們需要定義問題、演算法、監控器和工作流程,以及它們各自的 setup() 函數來完成初始化。
這裡需要注意的一個關鍵點是,種群大小(本例中的 POP_SIZE)需要在問題和演算法中都進行初始化,以避免潛在的錯誤。
POP_SIZE = 100
vmapped_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=POP_SIZE,
device=device,
)
vmapped_problem.setup()
pop_algorithm = PSO(
pop_size=POP_SIZE,
lb=lower_bound,
ub=upper_bound,
device=device,
)
pop_algorithm.setup()
monitor = EvalMonitor(
topk=3,
device=device,
)
monitor.setup()
pop_workflow = StdWorkflow()
pop_workflow.setup(
algorithm=pop_algorithm,
problem=vmapped_problem,
solution_transform=adapter,
monitor=monitor,
device=device,
)
print("Upon gradient descent, the population-based neuroevolution process start. ")
neuroevolution_process(
workflow=pop_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=10,
)
pop_workflow.get_submodule("monitor").plot()
單個體神經演化測試
接下來,測試基於單個體演算法的神經演化。與基於種群的情況類似,我們需要定義問題、演算法、監控器和工作流程,並在初始化時呼叫它們各自的 setup() 函數。在這種情況下,選擇隨機搜尋策略作為演算法。
這裡需要注意的一個關鍵點是,SupervisedLearningProblem 應設定 pop_size=None,EvalMonitor 應設定 topk=1,因為只搜尋單個個體。仔細的超參數設定有助於避免不必要的問題。
single_problem = SupervisedLearningProblem(
model=model,
data_loader=pre_ne_train_loader,
criterion=weighted_criterion,
pop_size=None,
device=device,
)
single_problem.setup()
@jit_class
class RandAlgorithm(Algorithm):
def __init__(self, lb, ub):
super().__init__()
assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}. "
assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}. "
self.hp = Parameter([1.0, 2.0])
self.lb = lb
self.ub = ub
self.dim = lb.shape[0]
self.pop = Mutable(torch.empty(1, lb.shape[0], dtype=lb.dtype, device=lb.device))
self.fit = Mutable(torch.empty(1, dtype=lb.dtype, device=lb.device))
def step(self):
pop = torch.rand(
self.dim,
dtype=self.lb.dtype,
device=self.lb.device,
)
pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
pop = pop * self.hp[0]
self.pop.copy_(pop)
self.fit.copy_(self.evaluate(pop))
single_algorithm = RandAlgorithm(lb=lower_bound, ub=upper_bound)
single_monitor = EvalMonitor(
topk=1,
device=device,
)
single_monitor.setup()
single_workflow = StdWorkflow()
single_workflow.setup(
algorithm=single_algorithm,
problem=single_problem,
solution_transform=adapter,
monitor=single_monitor,
device=device,
)
print("Upon gradient descent, the single-individual neuroevolution process start. ")
neuroevolution_process(
workflow=single_workflow,
adapter=adapter,
model=model,
test_loader=pre_test_loader,
device=device,
best_acc=gd_acc,
max_generation=12,
)
single_workflow.get_submodule("monitor").plot()