Source code for pyRadPlan.dose.engines._base

"""Base class for all dose engines."""

import logging

import sys

if sys.version_info < (3, 10):
    import importlib_resources as resources  # Backport for older versions
else:
    from importlib import resources  # Standard from Python 3.9+


import warnings
import time
from typing import ClassVar, Union
from abc import ABC, abstractmethod

import SimpleITK as sitk
import numpy as np
from pydantic import ValidationError

from pyRadPlan.core import Grid
from pyRadPlan.core.np2sitk import linear_indices_to_grid_coordinates
from pyRadPlan.core.resample import resample_numpy_array
from pyRadPlan.ct import CT, resample_ct
from pyRadPlan.cst import StructureSet
from pyRadPlan.stf import SteeringInformation, validate_stf
from pyRadPlan.plan import Plan, validate_pln
from pyRadPlan.dij import Dij, validate_dij
from pyRadPlan.scenarios import create_scenario_model, ScenarioModel
from pyRadPlan.machines import load_machine_from_mat, validate_machine, Machine
from ...core.xp_utils import choose_array_api_namespace, choose_device


logger = logging.getLogger(__name__)

# from dose.calcDoseInit import init_dose_calc


[docs] class DoseEngineBase(ABC): """ Abstract Interface for all dose engines. All dose engines should inherit from this class. Parameters ---------- pln : Plan The Plan object to assign properties from. Attributes ---------- short_name : str The short name of the dose engine. name : str The name of the dose engine. possible_radiation_modes : list[str] The possible radiation modes for the dose engine. is_dose_engine : bool = True Helper class variable telling you that this is a dose engine select_voxels_in_scenarios : bool Whether to select voxels in scenarios. mult_scen : Union[str, ScenarioModel] The scenario model. bio_model : Union[str, dict] The biological model. dose_grid : Union[Grid,dict] The dose grid to use (struct with at least doseGrid.resolution.x/y/z set). """ # Constant, Abstract properties short_name: ClassVar[str] name: ClassVar[str] possible_radiation_modes: ClassVar[list[str]] = NotImplemented is_dose_engine: ClassVar[bool] = True # Helper variable mult_scen: Union[str, ScenarioModel] bio_model: Union[str, dict] dose_grid: Union[Grid, dict] select_voxels_in_scenarios: bool # Public properties def __init__(self, pln: Union[Plan, dict] = None): # Assign default parameters from Matrad_Config or manually self.mult_scen = "nomScen" self.select_voxels_in_scenarios = None self.voxel_sub_ix = None # selection of where to calculate / store dose, empty by default self.dose_grid = None if pln is not None: self.assign_properties_from_pln(pln, True) self._ct_grid = None # Protected properties with public get access self._machine: Machine = None # base data defined in machine file self._timers = None # timers of dose calc self._num_of_columns_dij = None # number of columns in the dij struct self._vox_world_coords = None # ct voxel coordinates in world self._vox_world_coords_dose_grid = None # dose grid voxel coordinates in world self._vct_grid = None # voxel grid inside patient self._vdose_grid = None # voxel dose grid self._vct_grid_scen_ix = None # logical subindexes of scenarios in ct grid self._vdose_grid_scen_ix = None # logical subindexes of scenarios in dose grid self._vct_grid_mask = None # voxel grid inside patient as logical mask self._vdose_grid_mask = None # voxel dose grid inside patient as logical mask self._robust_voxels_on_grid = None # voxels to be computed in robustness scenarios # Protected properties with private get access self._last_progress_update = None self._calc_dose_direct = False self.xp = choose_array_api_namespace() self.device = choose_device() # initializing super() class (here: ABC) super().__init__() # Public methods
[docs] def assign_properties_from_pln(self, pln: Plan, warn_when_property_changed: bool = False): """ Assign properties from a Plan object to the Dose Engine. This includes the Scenario Model and the Biological Model, and any other properties that can be stored in the prop_dose_calc dictionary within the Plan object. This function will check if a property exists for the dose engine and, if yes, set it. Parameters ---------- pln : Plan The Plan object to assign properties from. warn_when_property_changed : bool Whether to warn when properties are changed. """ pln = validate_pln(pln) # Set Scenario Model if hasattr(pln, "mult_scen"): self.mult_scen = pln.mult_scen # Assign Biologival Model if hasattr(pln, "bio_param"): self.bio_param = pln.bio_param # TODO: No bio_param yet if not isinstance(warn_when_property_changed, bool): warn_when_property_changed = False # Overwrite default properties within the engine # with the ones given in the prop_dose_calc dict if pln.prop_dose_calc is not None: prop_dict = pln.prop_dose_calc if ( "engine" in prop_dict and prop_dict["engine"] and prop_dict["engine"] != self.short_name ): raise ValueError( f"Inconsistent dose engines given! pln asks for '{prop_dict['engine']}', " f"but you are using '{self.short_name}'!" ) prop_dict.pop("engine", None) else: prop_dict = {} fields = prop_dict.keys() # Set up warning message if warn_when_property_changed: warning_msg = "Property in Dose Engine overwritten from pln.propDoseCalc" else: warning_msg = None for field in fields: if not hasattr(self, field): warnings.warn('Property "{}" not found in Dose Engine!'.format(field)) elif warn_when_property_changed and warning_msg: logger.warning(warning_msg + f": {field}") setattr(self, field, prop_dict[field])
[docs] def calc_dose_forward( self, ct: CT, cst: StructureSet, stf: SteeringInformation, w: np.ndarray ) -> dict: """ Perform a forward dose calculation. Compute a dose cube by directly applying a set of weights during dose calculation. Parameters ---------- ct : CT The CT scan data. cst : StructureSet The structure set containing volumes of interest (VOIs). stf : SteeringInformation The steering information containing beam configurations. w : np.ndarray The weights to apply during dose calculation. Returns ------- Dij The dose information Notes ----- pyRadPlan handles the forward dose calculation by setting a switch in the dose engine (_calc_dose_direct) to True. This facilitates reusing of algorithms in both forward and influence matrix calculations. """ time_start = time.time() self._calc_dose_direct = True stf = validate_stf(stf) if w is None: logger.info("No weights given. Using weights stored in stf.") else: logger.info("Using provided weights for forward dose calculation.") if w.size != stf.total_number_of_bixels: raise ValueError("Number of weights does not match the number of bixels in stf.") # Assign the weights to the stf bixel_num = 0 for beam in stf.beams: for ray in beam.rays: for bixel in ray.beamlets: bixel.weight = w[bixel_num] bixel_num += 1 # Run dose calculation (direct flag is on) dij = self._calc_dose(ct, cst, stf) # Now do the forward weighting with w # This is done because the engine might store the individual fields result = dij.compute_result_ct_grid(np.ones(dij.total_num_of_bixels, dtype=np.float32)) time_elapsed = time.time() - time_start logger.info("Forward dose calculation done in %.2f seconds.", time_elapsed) return result
[docs] def calc_dose_influence(self, ct: CT, cst: StructureSet, stf: SteeringInformation) -> Dij: """ Calculate the set of dose/quantity influence matrices. These are the matrices that map a fluence vector to a dose/quantity distribution. Parameters ---------- ct : CT The CT scan data. cst : StructureSet The structure set containing volumes of interest (VOIs). stf : SteeringInformation The steering information containing beam configurations. Returns ------- Dij The dose influence matrix collection Notes ----- pyRadPlan handles the forward dose calculation by setting a switch in the dose engine (_calc_dose_direct) to True. This facilitates reusing of algorithms in both forward and influence matrix calculations. """ time_start = time.time() self._calc_dose_direct = False dij = self._calc_dose(ct, cst, stf) time_elapsed = time.time() - time_start logger.info("Dose influence matrix calculation done in %.2f seconds.", time_elapsed) return dij
[docs] def set_overlap_priorities(self, cst: StructureSet, resample: bool = True) -> StructureSet: """Apply overlap priorities and, if enabled, resample the CST to the dose grid.""" logger.info("Adjusting structures for overlap... ") t_start = time.time() num_of_ct_scenarios = np.unique([x.num_of_scenarios for x in cst.vois]) if len(num_of_ct_scenarios) > 1: raise ValueError("Inconsistent number of scenarios in cst struct.") new_cst = cst.apply_overlap_priorities() t_end = time.time() logger.info("Done in %.2f seconds.", t_end - t_start) return new_cst
[docs] def select_voxels_from_cst(self, cst, dose_grid, selection_mode): """ Get mask of the voxels (on dose grid). Gets the mask from the cst structures specified by selection_mode. """ self.set_overlap_priorities(self, cst, dose_grid["dimensions"]) selected_cst_structs = [] include_mask = [ np.zeros(np.prod(dose_grid["dimensions"]), dtype=bool) for _ in range(len(cst[:, 4])) ] if selection_mode == "all": for ct_scen_idx in range(len(include_mask)): include_mask[ct_scen_idx][:] = True else: if isinstance(selection_mode, str): if selection_mode == "targetOnly": selected_cst_structs = [i for i, x in enumerate(cst[:, 2]) if x == "TARGET"] elif selection_mode == "objectivesOnly": for i in range(len(cst)): if len(cst[i, 5]) > 0: selected_cst_structs.append(i) elif selection_mode == "oarsOnly": selected_cst_structs = [i for i, x in enumerate(cst[:, 2]) if x == "OAR"] elif selection_mode == "robustnessOnly": for i in range(len(cst)): for j in range(len(cst[i, 5])): if ( "robustness" in cst[i, 5][j] and cst[i, 5][j]["robustness"] != "none" ): selected_cst_structs.append(i) else: raise ValueError(f"Unrecognized voxel selection mode: {selection_mode}") elif isinstance(selection_mode, (list, np.ndarray)): selected_cst_structs = np.unique( np.intersect1d(selection_mode, np.arange(len(cst)) + 1) ) if not np.array_equal(selected_cst_structs, np.unique(selection_mode)): warnings.warn( "Specified structures are not compatible with cst structures. " f"Only performing calculation on structures: {selected_cst_structs}" ) # Loop over all cst structures for i in range(len(cst)): if len(cst[i, 4][0]) > 0: if len(cst[i, 5]) > 0: # Loop over obj/constraint functions for j in range(len(cst[i, 5])): obj = cst[i, 5][j] if not isinstance(obj, dict): try: pass # TODO: obj = matRad_DoseOptimizationFunction. # createInstanceFromStruct(obj) except ValueError: logger.error( f"cst[{i},5][{j}] is not a valid Objective/constraint! " "Remove or Replace and try again!" ) robustness = obj.get("robustness", "none") if i in selected_cst_structs: for ct_idx in range(len(cst[i, 4])): include_mask[ct_idx][cst[i, 4][ct_idx]] = 1 if robustness == "none": logger.info( f"Including cst structure {cst[i, 1]} even though this " "structure has no robustness." ) else: logger.info( f"Excluding cst structure {cst[i, 1]} even though this " "structure has an objective or constraint." ) if robustness != "none": logger.info( f"Excluding cst structure {cst[i, 1]} even though this " "structure has robustness." ) elif i in selected_cst_structs: logger.info( f"Including cst structure {cst[i, 1]} even though this structure " "does not have any objective or constraint" ) return include_mask
[docs] def resize_cst_to_grid(self, cst: StructureSet, ct_with_new_grid: CT) -> StructureSet: """Resize the CST to the dose cube resolution.""" logger.info("Resampling structure set... ") t_start = time.time() cst.resample_on_new_ct(ct_with_new_grid) t_end = time.time() logger.info("Done in %.2f seconds.", t_end - t_start) return cst
# private methods def _init_dose_calc(self, ct: CT, cst: StructureSet, stf: SteeringInformation) -> dict: """ Initialize the dose calculation. Parameters ---------- self : DoseEngineBase The instance of the dose engine. ct : CT The CT scan data. cst : StructureSet The structure set containing volumes of interest (VOIs). stf : SteeringInformation The steering information containing beam configurations. Returns ------- dict A dictionary containing the dose influence matrix (dij) and related information. Description ----------- This method sets up the necessary grids and parameters for dose calculation. It initializes the CT and dose grids, checks machine and radiation mode consistency, creates a scenario model, and sets up arrays for bookkeeping. It also adjusts the isocenter internally for different dose grids and converts CT subscripts to world coordinates. Additionally, it loads the machine file from the base data folder and performs voxel selection for dose calculation. """ self._ct_grid = Grid.from_sitk_image(ct.cube_hu) dij = {} # Default: dose influence matrix computation if self._calc_dose_direct: logger.info("Forward dose calculation using '%s' Dose Engine...", self.name) else: logger.info("Dose influence matrix calculation using '%s' Dose Engine...", self.name) # Check if machine and radiation_mode are consistent machine = list({beam.machine for beam in stf.beams}) radiation_mode = list({beam.radiation_mode for beam in stf.beams}) if len(machine) != 1 or len(radiation_mode) != 1: raise ValueError("machine and radiation mode need to be unique within supplied stf!") machine = machine[0] radiation_mode = radiation_mode[0] # ScenarioModel if not isinstance(self.mult_scen, ScenarioModel): self.mult_scen = create_scenario_model(self.mult_scen, ct) # TODO: Add BioModel Support # if not np.isnan(self.bio_param["RBE"]): # dij['RBE'] = self.bio_param['RBE'] # store CT grid dij["ct_grid"] = ct.grid if self.dose_grid is None: logger.info("Dose Grid not set. Using default resolution.") self.dose_grid = ct.grid.resample({"x": 5.0, "y": 5.0, "z": 5.0}) if not isinstance(self.dose_grid, Grid): try: self.dose_grid = Grid.model_validate(self.dose_grid) except ValidationError: self.dose_grid = ct.grid.resample(self.dose_grid["resolution"]) logger.info( "Dose Grid has Dimensions (%d,%d,%d) with resolution x=%f, y=%f, z=%f.", self.dose_grid.dimensions[0], self.dose_grid.dimensions[1], self.dose_grid.dimensions[2], self.dose_grid.resolution["x"], self.dose_grid.resolution["y"], self.dose_grid.resolution["z"], ) dij["dose_grid"] = self.dose_grid # Meta information for dij #TODO: Change dij Model in future dij["num_of_beams"] = len(stf.beams) dij["num_of_scenarios"] = 1 # TODO: Add support for multiple scenarios dij["num_of_rays_per_beam"] = [stf.beams[i].num_of_rays for i in range(len(stf.beams))] dij["total_num_of_bixels"] = int( sum([stf.beams[i].total_number_of_bixels for i in range(len(stf.beams))]) ) dij["total_num_of_rays"] = int(sum(dij["num_of_rays_per_beam"])) # Check if full dose influence data is required self._num_of_columns_dij = ( len(stf.beams) if self._calc_dose_direct else dij["total_num_of_bixels"] ) # Set up arrays for book keeping dij["bixel_num"] = np.nan * np.ones(self._num_of_columns_dij) dij["ray_num"] = np.nan * np.ones(self._num_of_columns_dij) dij["beam_num"] = np.nan * np.ones(self._num_of_columns_dij) # Default MU calibration dij["min_mu"] = np.zeros(self._num_of_columns_dij) dij["max_mu"] = np.ones(self._num_of_columns_dij) * np.inf dij["num_of_particles_per_mu"] = np.ones(self._num_of_columns_dij) * 1e6 if self.voxel_sub_ix is None: self.voxel_sub_ix = np.unique( np.concatenate([cst.vois[c].indices_numpy for c in range(0, len(cst.vois))]) ) tmp_vct_grid_scen = [np.asarray(self.voxel_sub_ix)] * ct.num_of_ct_scen self._vct_grid = np.unique(np.concatenate(tmp_vct_grid_scen)) # Find subindexes for the individual scenarios. This helps with subselection later. self._vct_grid_scen_ix = [np.isin(self._vct_grid, c) for c in tmp_vct_grid_scen] # Initialize tmp_vdose_grid_scen tmp_vdose_grid_scen = [None] * ct.num_of_ct_scen if self._ct_grid == self.dose_grid: tmp_vdose_grid_scen = tmp_vct_grid_scen resampled_ct = ct else: resampled_ct = resample_ct( ct=ct, interpolator=sitk.sitkNearestNeighbor, target_grid=self.dose_grid, ) self.dose_grid = Grid.from_sitk_image(resampled_ct.cube_hu) for s in range(ct.num_of_ct_scen): # Receive linear indices and grid locations from the dose grid tmp_cube = np.zeros(ct.cube_dim[::-1], dtype=np.float32) tmp_cube.ravel()[tmp_vct_grid_scen[s]] = 1 interpolated_tmp_cube = resample_numpy_array( tmp_cube, reference_image=ct.cube_hu, interpolator=sitk.sitkNearestNeighbor, target_image=resampled_ct.cube_hu, ) tmp_vdose_grid_scen[s] = np.where( np.abs(np.round(interpolated_tmp_cube)).ravel().astype(bool) ) # Get unique dose grid self._vdose_grid = np.unique(np.concatenate(tmp_vdose_grid_scen, axis=0)) # Find subindexes for the dose grid scenarios self._vdose_grid_scen_ix = [np.isin(self._vdose_grid, c) for c in tmp_vdose_grid_scen] # Convert CT subscripts to world coordinates. self._vox_world_coords = linear_indices_to_grid_coordinates( self._vct_grid, self._ct_grid, index_type="numpy" ) self._vox_world_coords_dose_grid = linear_indices_to_grid_coordinates( self._vdose_grid, self.dose_grid, index_type="numpy" ) # Create helper masks self._vdose_grid_mask = np.zeros(self.dose_grid.num_voxels, dtype=bool) self._vdose_grid_mask[self._vdose_grid] = True self._vct_grid_mask = np.zeros(np.prod(ct.cube_dim), dtype=bool) self._vct_grid_mask[self._vct_grid] = True # Load machine file from base data folder self._machine = self.load_machine(radiation_mode, machine) # TODO: this is currently not needed, but may be needed in the future # cst = self.set_overlap_priorities(cst).resample_on_new_ct(resampled_ct) dij["alphax"], dij["betax"] = cst.get_reference_lq_params( overlap_is_applied=False, resample_grid=self.dose_grid ) # TODO: this function has not yet been implemented # structures that are selected here will be included in dose calculation over # the robust scenarios # self._robust_voxels_on_grid = select_voxels_from_cst(cst, dij["ct_grid"]) return dij def _finalize_dose(self, dij: dict) -> Dij: return validate_dij(dij) def _progress_update(self, pos, total): raise NotImplementedError # Private and abstract methods @abstractmethod def _calc_dose(self, ct: CT, cst: StructureSet, stf: SteeringInformation) -> Dij: raise NotImplementedError("Method '_calc_dose' must be implemented.") # static methods
[docs] @staticmethod def is_available(pln, machine): available, msg = [], [] return available, msg
[docs] @staticmethod def load_machine(radiation_mode, machine_name): file_name = radiation_mode + "_" + machine_name + ".mat" machines_path = resources.files("pyRadPlan.data.machines") path = machines_path.joinpath(file_name) machine = validate_machine(load_machine_from_mat(path)) return machine