在 EvoX 中自定义算法和问题
在本章中,我们将介绍如何在 EvoX 中实现您自己的算法和问题。
算法和问题的布局
在大多数传统的进化计算(EC)库中,算法通常在内部调用目标函数,其布局如下:
Algorithm
|
+--Problem
但在 EvoX 中,我们采用了扁平化布局:
Algorithm.step -- Problem.evaluate
这种布局使得算法和问题都更加通用:一个算法可以优化不同的问题,而一个问题也可以适用于多种算法。
Algorithm 类
Algorithm 类继承自 ModuleBase。
总共有 5 个方法(其中 2 个是可选的)我们需要实现:
| 方法 | 签名 | 用途 |
|---|---|---|
__init__ | (self, ...) | 初始化算法实例,例如种群大小(在迭代过程中保持不变)、超参数(只能由 HPO 问题包装器设置或在此处初始化)和/或可变张量(可以在运行中修改)。 |
step | (self) | 执行算法的一个常规优化迭代步骤。 |
init_step (可选) | (self) | 执行算法优化的第一步。如果未重写此方法,则会调用 step 方法代替。 |
注意: 静态初始化仍然可以写在
__init__中,但可变子模块的初始化则不能。因此,如果重写的setup方法首先调用了ModuleBase的setup(),那么多次调用setup进行重复初始化是可能的。如果
ModuleBase中的这种setup方法不适合您的算法,您可以在创建自己的算法类时重写setup方法。
Problem 类
Problem 类也继承自 ModuleBase。
然而,Problem 类非常简单。除了 __init__ 方法外,唯一必须的方法是 evaluate 方法。
| 方法 | 签名 | 用途 |
|---|---|---|
__init__ | (self, ...) | 初始化问题的设置。 |
evaluate | (self, pop: torch.Tensor) -> torch.Tensor | 评估给定种群的适应度。 |
不过,在重写的方法中,evaluate 中 pop 参数的类型可以更改为其他兼容 JIT 的类型。
示例
这里我们给出一个实现 PSO 算法来解决 Sphere 问题的示例。
示例的伪代码
以下是伪代码:
Set hyper-parameters
Generate the initial population
Do
Compute fitness
Update the local best fitness and the global best fitness
Update the velocity
Update the population
Until stopping criterion
下面是算法和问题的各个部分在 EvoX 中的对应关系。
Set hyper-parameters # Algorithm.__init__
Generate the initial population # Algorithm.setup
Do
# Problem.evaluate (not part of the algorithm)
Compute fitness
# Algorithm.step
Update the local best fitness and the global best fitness
Update the velocity
Update the population
Until stopping criterion
算法示例:PSO 算法
粒子群优化(PSO)是一种基于种群的元启发式算法,其灵感来源于鸟类和鱼类的社会行为。它被广泛用于解决连续和离散优化问题。
以下是 EvoX 中 PSO 算法的实现示例:
import torch
from typing import List
from evox.utils import clamp
from evox.core import Parameter, Mutable, Algorithm
class PSO(Algorithm):
#Initialize the PSO algorithm with the given parameters.
def __init__(
self,
pop_size: int,
lb: torch.Tensor,
ub: torch.Tensor,
w: float = 0.6,
phi_p: float = 2.5,
phi_g: float = 0.8,
device: torch.device | None = None,
):
super().__init__()
assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
self.pop_size = pop_size
self.dim = lb.shape[0]
# Here, Parameter is used to indicate that these values are hyper-parameters
# so that they can be correctly traced and vector-mapped
self.w = Parameter(w, device=device)
self.phi_p = Parameter(phi_p, device=device)
self.phi_g = Parameter(phi_g, device=device)
lb = lb[None, :].to(device=device)
ub = ub[None, :].to(device=device)
length = ub - lb
population = torch.rand(self.pop_size, self.dim, device=device)
population = length * population + lb
velocity = torch.rand(self.pop_size, self.dim, device=device)
velocity = 2 * length * velocity - length
self.lb = lb
self.ub = ub
# Mutable parameters
self.population = Mutable(population)
self.velocity = Mutable(velocity)
self.local_best_location = Mutable(population)
self.local_best_fitness = Mutable(torch.empty(self.pop_size, device=device).fill_(torch.inf))
self.global_best_location = Mutable(population[0])
self.global_best_fitness = Mutable(torch.tensor(torch.inf, device=device))
def step(self):
# Compute fitness
fitness = self.evaluate(self.population)
# Update the local best fitness and the global best fitness
compare = self.local_best_fitness - fitness
self.local_best_location = torch.where(
compare[:, None] > 0, self.population, self.local_best_location
)
self.local_best_fitness = self.local_best_fitness - torch.relu(compare)
self.global_best_location, self.global_best_fitness = self._min_by(
[self.global_best_location.unsqueeze(0), self.population],
[self.global_best_fitness.unsqueeze(0), fitness],
)
# Update the velocity
rg = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
rp = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
velocity = (
self.w * self.velocity
+ self.phi_p * rp * (self.local_best_location - self.population)
+ self.phi_g * rg * (self.global_best_location - self.population)
)
# Update the population
population = self.population + velocity
self.population = clamp(population, self.lb, self.ub)
self.velocity = clamp(velocity, self.lb, self.ub)
def _min_by(self, values: List[torch.Tensor], keys: List[torch.Tensor]):
# Find the value with the minimum key
values = torch.cat(values, dim=0)
keys = torch.cat(keys, dim=0)
min_index = torch.argmin(keys)
return values[min_index], keys[min_index]
问题示例:Sphere 问题
Sphere 问题是一个简单但基础的基准优化问题,用于测试优化算法。
Sphere 函数定义如下:
$$ \min f(x)= \sum_{i=1}^{n} x_{i}^{2} $$ 以下是 EvoX 中 Sphere 问题的实现示例:
import torch
from evox.core import Problem
class Sphere(Problem):
def __init__(self):
super().__init__()
def evaluate(self, pop: torch.Tensor):
return (pop**2).sum(-1)
现在,您可以初始化一个工作流并运行它。