Algoritmo e Problema Personalizzati
In questo notebook, mostreremo come utilizzare Algorithm e Problem per creare un algoritmo e un problema personalizzati. Qui forniremo un esempio di implementazione di un algoritmo PSO che risolve il problema Sphere.
import torch
from evox.core import Algorithm, Mutable, Parameter, Problem
from evox.utils import clamp
from evox.workflows import EvalMonitor, StdWorkflow
Esempio di algoritmo: algoritmo PSO
La Particle Swarm Optimization (PSO) è un algoritmo metaeuristico basato sulla popolazione ispirato al comportamento sociale di uccelli e pesci. È ampiamente utilizzato per risolvere problemi di ottimizzazione continui e discreti.
Ecco un esempio di implementazione dell’algoritmo PSO in EvoX:
def min_by(
values,
keys,
):
"""A helper function to find the minimum value in a list of values."""
values = torch.cat(values, dim=0)
keys = torch.cat(keys, dim=0)
min_index = torch.argmin(keys)
return values[min_index[None]][0], keys[min_index[None]][0]
class PSO(Algorithm):
def __init__(
self,
pop_size: int,
lb: torch.Tensor,
ub: torch.Tensor,
w: float = 0.6,
phi_p: float = 2.5,
phi_g: float = 0.8,
device: torch.device | None = None,
):
super().__init__()
device = torch.get_default_device() if device is None else device
assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
self.pop_size = pop_size
self.dim = lb.shape[0]
# Here, Parameter is used to indicate that these values are hyper-parameters
# so that they can be correctly traced and vector-mapped
self.w = Parameter(w, device=device)
self.phi_p = Parameter(phi_p, device=device)
self.phi_g = Parameter(phi_g, device=device)
# setup
lb = lb[None, :].to(device=device)
ub = ub[None, :].to(device=device)
length = ub - lb
pop = torch.rand(self.pop_size, self.dim, device=device)
pop = length * pop + lb
velocity = torch.rand(self.pop_size, self.dim, device=device)
velocity = 2 * length * velocity - length
# write to self
self.lb = lb
self.ub = ub
# mutable
self.pop = Mutable(pop)
self.velocity = Mutable(velocity)
self.fit = Mutable(torch.full((self.pop_size,), torch.inf, device=device))
self.local_best_location = Mutable(pop)
self.local_best_fit = Mutable(torch.full((self.pop_size,), torch.inf, device=device))
self.global_best_location = Mutable(pop[0])
self.global_best_fit = Mutable(torch.tensor(torch.inf, device=device))
def step(self):
compare = self.local_best_fit > self.fit
self.local_best_location = torch.where(compare[:, None], self.pop, self.local_best_location)
self.local_best_fit = torch.where(compare, self.fit, self.local_best_fit)
self.global_best_location, self.global_best_fit = min_by(
[self.global_best_location.unsqueeze(0), self.pop],
[self.global_best_fit.unsqueeze(0), self.fit],
)
rg = torch.rand(self.pop_size, self.dim, device=self.fit.device)
rp = torch.rand(self.pop_size, self.dim, device=self.fit.device)
velocity = (
self.w * self.velocity
+ self.phi_p * rp * (self.local_best_location - self.pop)
+ self.phi_g * rg * (self.global_best_location - self.pop)
)
pop = self.pop + velocity
self.pop = clamp(pop, self.lb, self.ub)
self.velocity = clamp(velocity, self.lb, self.ub)
self.fit = self.evaluate(self.pop)
def init_step(self):
"""Perform the first step of the PSO optimization.
See `step` for more details.
"""
self.fit = self.evaluate(self.pop)
self.local_best_fit = self.fit
self.global_best_fit = torch.min(self.fit)
Esempio di problema: problema Sphere
Il problema Sphere è un problema di ottimizzazione benchmark semplice ma fondamentale, utilizzato per testare gli algoritmi di ottimizzazione.
La funzione Sphere è definita come:
$$ \min f(x)= \sum_{i=1}^{n} x_{i}^{2} $$ Ecco un esempio di implementazione del problema Sphere in EvoX:
class Sphere(Problem):
def __init__(self):
super().__init__()
def evaluate(self, pop: torch.Tensor):
return (pop**2).sum(-1)
Utilizzare l’algoritmo per risolvere il problema
Inizializzare l’algoritmo, il problema e il monitor
algorithm = PSO(
pop_size=100,
lb=torch.tensor([-10.0]),
ub=torch.tensor([10.0]),
w=0.6,
phi_p=2.5,
phi_g=0.8,
)
problem = Sphere()
monitor = EvalMonitor()
Inizializzare il workflow ed eseguirlo
workflow = StdWorkflow(algorithm=algorithm, problem=problem, monitor=monitor)
for _ in range(100):
workflow.step()
workflow.monitor.plot()