Source code for pyRadPlan.optimization.solvers._base_solvers
"""Solver Base Classes for Planning Problems."""
from typing import ClassVar, Callable, Any
from abc import ABC, abstractmethod
from ...core.xp_utils.typing import Array
from ...util.keyboard_listener import KeyboardListener
import numpy as np
from numpy.typing import ArrayLike
import logging
logger = logging.getLogger(__name__)
[docs]
class SolverBase(ABC):
"""
Abstract Base Class for Solver Implementations / Interfaces.
Attributes
----------
name : ClassVar[str]
Full name of the solver
short_name : ClassVar[str]
Short name of the solver
max_time : float, default=3600
Maximum time for the solver to run in seconds
bounds : Array, default=[0.0, np.inf]
Bounds for the variables
"""
name = ClassVar[str]
short_name = ClassVar[str]
gpu_compatible = ClassVar[bool]
allow_keyboard_cancel: bool = False
cancel_key: str = "q"
allow_esc_cancel: bool = True # Also allow ESC (\x1b) to cancel
# properties
max_time: float
bounds: Array
def __init__(self):
self.max_time = 3600
self.bounds = [0.0, np.inf]
self.device = None
# Keyboard listener utility (manages platform specifics internally)
self._keyboard_listener = KeyboardListener(
allow_keyboard_cancel=self.allow_keyboard_cancel,
cancel_key=self.cancel_key,
allow_esc_cancel=self.allow_esc_cancel,
component_name=lambda: getattr(self, "name", "Solver"),
)
# Reflect effective capability (may be disabled by platform checks)
self.allow_keyboard_cancel = self._keyboard_listener.allow_keyboard_cancel
def __repr__(self) -> str:
return f"Solver {self.name} ({self.short_name})"
[docs]
def solve(self, x0: Array) -> tuple[Array, dict]:
"""
Interface method to solve the problem.
Parameters
----------
x0 : Array
Initial guess for the solution
Returns
-------
tuple[Array, dict]
Solution vector and additional information as dictionary
"""
self._keyboard_listener.initialize()
# Important!: Everything between start and end kb_thread must be
# inside try/finally to ensure thread is stopped on error!.
# Or terminal stays in weird state.
try:
logger.info(f"Starting optimization using {self.name}")
x, status = self._solve_problem(x0)
# Ensure thread is stopped.
finally:
self._keyboard_listener.finalize()
return x, status
@abstractmethod
def _solve_problem(self, x0: ArrayLike) -> tuple[np.ndarray, dict[str, Any]]:
"""
Solve the problem.
Parameters
----------
x0 : Array
Initial guess for the solution
Returns
-------
tuple[Array, dict]
Solution vector and additional information as dictionary
"""
@abstractmethod
def _callback(self, *args: Any, **kwargs: Any) -> Any:
"""
Check for early stopping during solver callback.
This method should handle both early stopping checks and
user-provided callback functions if the optimizer supports them.
"""
[docs]
class NonLinearOptimizer(SolverBase):
"""
Non-Linear Optimization Solver Base Class.
Attributes
----------
max_iter : int
Maximum number of iterations
abs_obj_tol : float
Absolute objective tolerance
objective : Callable
Objective function handle
gradient : Callable
Gradient function handle
hessian : Callable, default=None
Hessian function handle
constraints : Callable, default=None
Constraints function handle
constraints_jac : Callable, default=None
Constraints Jacobian function handle
supply_iter_func : bool
Whether to supply an iteration callback function
"""
max_iter: int
abs_obj_tol: float
objective: Callable
gradient: Callable
hessian: Callable
constraints: Callable
constraints_jac: Callable
supply_iter_func: bool
def __init__(self):
super().__init__()
self.max_iter = 500
self.abs_obj_tol = 1e-6
self.objective = None
self.gradient = None
self.hessian = None
self.constraints = None
self.constraints_jac = None
[docs]
def iter_func(self, *args, **kwargs) -> bool:
"""
Get or set solver information as iteration callback.
Agnostic signature with *args and **kwargs to be able to accommodate
various solvers.
Parameters
----------
*args
Additional arguments
**kwargs
Additional keyword arguments
Returns
-------
bool
Whether to continue the optimization
"""
return True