カスタムアルゴリズムによるHPOのデプロイ

カスタムアルゴリズムによるHPOのデプロイ

この章では、カスタムアルゴリズムによるHPOのデプロイに焦点を当て、全体的なワークフローではなく詳細を強調します。HPOデプロイの簡単な紹介はチュートリアルで提供されており、事前に読むことを強く推奨します。

アルゴリズムの並列化可能性の確保

内部アルゴリズムを問題に変換する必要があるため、内部アルゴリズムが並列化可能であることが重要です。そのため、アルゴリズムにいくつかの修正が必要になる場合があります。

  1. アルゴリズムは、アルゴリズム自体の属性に対するインプレース操作を持つメソッドがあってはなりません。
class ExampleAlgorithm(Algorithm):
    def __init__(self,...):
        self.pop = torch.rand(10,10) #attribute of the algorithm itself

    def step_in_place(self): # method with in-place operations
        self.pop.copy_(pop)

    def step_out_of_place(self): # method without in-place operations
        self.pop = pop
  1. コードロジックがPythonの制御フローに依存しないこと。
class ExampleAlgorithm(Algorithm):
    def __init__(self,...):
        self.pop = rand(10,10) #attribute of the algorithm itself
        pass

    def plus(self, y):
        return self.pop + y

    def minus(self, y):
        return self.pop - y

    def step_with_python_control_flow(self, y): # function with python control flow
        x = rand()
        if x > 0.5:
            self.pop = self.plus(y)
        else:
            self.pop = self.minus(y)

    def step_without_python_control_flow(self, y): # function without python control flow
        x = rand()
        cond = x > 0.5
        self.pop = torch.cond(cond, self.plus, self.minus, y)

HPOMonitorの活用

HPOタスクでは、各内部アルゴリズムのメトリクスを追跡するためにHPOMonitorを使用する必要があります。HPOMonitorは標準のmonitorと比較して、tell_fitnessメソッドを1つだけ追加しています。この追加は、HPOタスクが多次元で複雑なメトリクスを含むことが多いため、メトリクス評価の柔軟性を高めるために設計されています。

ユーザーはHPOMonitorのサブクラスを作成し、tell_fitnessメソッドをオーバーライドしてカスタム評価メトリクスを定義するだけです。

また、シンプルなHPOFitnessMonitorも提供しており、多目的問題の「IGD」と「HV」メトリクスの計算、および単目的問題の最小値をサポートしています。

簡単な例

ここでは、EvoXでHPOを使用する簡単な例を示します。PSOアルゴリズムを使用して、sphere問題を解くための基本アルゴリズムの最適なハイパーパラメータを探索します。

まず、必要なモジュールをインポートしましょう。

import torch

from evox.algorithms.pso_variants.pso import PSO
from evox.core import Algorithm, Mutable, Parameter, Problem
from evox.problems.hpo_wrapper import HPOFitnessMonitor, HPOProblemWrapper
from evox.workflows import EvalMonitor, StdWorkflow

次に、シンプルなsphere問題を定義します。これは通常のproblemsと違いはありません。

class Sphere(Problem):
    def __init__(self):
        super().__init__()

    def evaluate(self, x: torch.Tensor):
        return (x * x).sum(-1)

次に、アルゴリズムを定義します。torch.cond関数を使用し、並列化可能であることを確認します。具体的には、インプレース操作を修正し、Pythonの制御フローを調整します。

class ExampleAlgorithm(Algorithm):
    def __init__(self, pop_size: int, lb: torch.Tensor, ub: torch.Tensor):
        super().__init__()
        assert lb.ndim == 1 and ub.ndim == 1, f"Lower and upper bounds shall have ndim of 1, got {lb.ndim} and {ub.ndim}"
        assert lb.shape == ub.shape, f"Lower and upper bounds shall have same shape, got {lb.ndim} and {ub.ndim}"
        self.pop_size = pop_size
        self.hp = Parameter([1.0, 2.0, 3.0, 4.0])  # the hyperparameters to be optimized
        self.lb = lb
        self.ub = ub
        self.dim = lb.shape[0]
        self.pop = Mutable(torch.empty(self.pop_size, lb.shape[0], dtype=lb.dtype, device=lb.device))
        self.fit = Mutable(torch.empty(self.pop_size, dtype=lb.dtype, device=lb.device))

    def strategy_1(self, pop):  # one update strategy
        pop = pop * (self.hp[0] + self.hp[1])
        self.pop = pop

    def strategy_2(self, pop):  #  the other update strategy
        pop = pop * (self.hp[2] + self.hp[3])
        self.pop = pop

    def step(self):
        pop = torch.rand(self.pop_size, self.dim, dtype=self.lb.dtype, device=self.lb.device)  # simply random sampling
        pop = pop * (self.ub - self.lb)[None, :] + self.lb[None, :]
        control_number = torch.rand()
        self.pop = torch.cond(control_number < 0.5, self.strategy_1, self.strategy_2, (pop,))
        self.fit = self.evaluate(self.pop)

Pythonの制御フローを処理するためにtorch.condを使用します。次に、StdWorkflowを使用してproblemalgorithmmonitorをラップできます。そしてHPOProblemWrapperを使用してStdWorkflowをHPO問題に変換します。

torch.set_default_device("cuda" if torch.cuda.is_available() else "cpu")
inner_algo = ExampleAlgorithm(10, -10 * torch.ones(8), 10 * torch.ones(8))
inner_prob = Sphere()
inner_monitor = HPOFitnessMonitor()
inner_monitor.setup()
inner_workflow = StdWorkflow()
inner_workflow.setup(inner_algo, inner_prob, monitor=inner_monitor)
# Transform the inner workflow to an HPO problem
hpo_prob = HPOProblemWrapper(iterations=9, num_instances=7, workflow=inner_workflow, copy_init_state=True)

HPOProblemWrapperが定義したハイパーパラメータを正しく認識するかテストできます。7つのインスタンスに対してハイパーパラメータを変更していないため、すべてのインスタンスで同一であるはずです。

params = hpo_prob.get_init_params()
print("init params:\n", params)

独自のハイパーパラメータ値のセットを指定することもできます。ハイパーパラメータセットの数はHPOProblemWrapperのインスタンス数と一致する必要があることに注意してください。カスタムハイパーパラメータは、値がParameterでラップされた辞書として提供する必要があります。

params = hpo_prob.get_init_params()
# since we have 7 instances, we need to pass 7 sets of hyperparameters
params["self.algorithm.hp"] = torch.nn.Parameter(torch.rand(7, 4), requires_grad=False)
result = hpo_prob.evaluate(params)
print("params:\n", params, "\n")
print("result:\n", result)

次に、PSOアルゴリズムを使用してExampleAlgorithmのハイパーパラメータを最適化します。PSOの集団サイズはインスタンス数と一致する必要があります。そうでないと、予期しないエラーが発生する可能性があります。この場合、HPOProblemWrapperは辞書を入力として必要とするため、外部ワークフローで解を変換する必要があります。

class solution_transform(torch.nn.Module):
    def forward(self, x: torch.Tensor):
        return {"self.algorithm.hp": x}


outer_algo = PSO(7, -3 * torch.ones(4), 3 * torch.ones(4))
monitor = EvalMonitor(full_sol_history=False)
outer_workflow = StdWorkflow()
outer_workflow.setup(outer_algo, hpo_prob, monitor=monitor, solution_transform=solution_transform())
outer_workflow.init_step()
for _ in range(20):
    outer_workflow.step()
monitor = outer_workflow.get_submodule("monitor")
print("params:\n", monitor.topk_solutions, "\n")
print("result:\n", monitor.topk_fitness)