Source code for pyRadPlan.dose.engines._fredmc

"""Monte Carlo engine for FRED.

FRED (Fast paRticle dosE calculatoR) is a Monte Carlo simulation toolkit
for particle therapy developed by the Fondazione CNAO in Italy.

References
----------
.. [1] Schiavi, A., Senzacqua, M., Pioli, S., Mairani, A., Magro, G., Molinelli, S.,
       Ciocca, M., Battistoni, G., Patera, V. (2017). Fred: a GPU-accelerated
       fast-Monte Carlo code for rapid treatment plan recalculation in ion beam therapy.
       Physics in Medicine and Biology, 62(18), 7482–7504.
       https://doi.org/10.1088/1361-6560/aa8134

Notes
-----
This engine provides an interface to the FRED Monte Carlo system
for particle dose calculations in radiotherapy treatment planning.
For installation instructions and more information, please visit: https://www.fred-mc.org/

Developed by the matRad development team.
"""

import logging
import os
import shutil
import subprocess
import tempfile
from typing import Any, Union, cast
import time
import textwrap
from pathlib import Path

import numpy as np
import SimpleITK as sitk
from scipy import sparse

from ...core import Grid
from ...ct import CT, resample_ct
from ...cst import StructureSet
from ...dij import Dij
from ...plan import Plan
from ...stf import SteeringInformation, Beam
from ...util import swap_orientation_sparse_matrix
from ...machines.particles import IonAccelerator

from ._base_montecarlo import MonteCarloEngineAbstract

logger = logging.getLogger(__name__)


[docs] class ParticleFredMCEngine(MonteCarloEngineAbstract): # constants short_name = "FRED" name = "FRED" possible_radiation_modes = ["protons", "helium", "carbon", "oxygen16"] # add more if needed available_source_models = ["gaussian", "emittance", "sigmaSqrModel"] available_versions = ["3.70.0", "3.76.0"] external_calculation: Union[str, bool] calc_bio_dose: bool calc_let: bool fred_version: str fred_cmd: str fred_dir: Path use_gpu: bool save_input: Union[os.PathLike, bool] save_output: Union[os.PathLike, bool] use_output: Union[os.PathLike, bool] print_output: bool scorers: list[str] source_model: str room_material: str HU_table_file: str scaling_factor: int def __init__(self, pln: Union[Plan, dict] = None): ### Public attributes ### # properties with setters and getters self.save_input = False self.save_output = False self.use_output = False self.source_model = "gaussian" self.external_calculation = False self.use_gpu = True self.calc_let = False self.calc_bio_dose = False self.scorers = ["Dose"] self.source_model = "gaussian" self.room_material = "Air" self.HU_table_file = "hLut.inp" self.scaling_factor = 1e6 self.fred_version = "3.70.0" self.print_output = False self.fred_cmd = "fred" self.fred_dir = Path(os.environ.get("FREDDIR", "")) ### Private attributes ### self._computed_quantities = [] self._total_number_of_bixels = None self._dij_format_version = None self._fred_root_folder = Path(tempfile.mkdtemp()) self._patient_filename = "CTpatient.mhd" self._run_input_filename = "fred.inp" self._regions_filename = "regions.inp" self._plan_filename = "plan.inp" self._plan_delivery_filename = "planDelivery.inp" self._input_foldername = "inp" self._regions_foldername = "regions" self._plan_foldername = "plan" self._output_folder = "out" self._output_image = "Dose.mhd" self._output_score = "/score/Phantom.Dose.mhd" self._inp_dir = self._fred_root_folder / self._input_foldername self._plan_dir = self._inp_dir / self._plan_foldername self._output_dir = self._fred_root_folder / self._output_folder self._regions_dir = self._inp_dir / self._regions_foldername self._conversion_factor = 1e6 self._HU_clamping: bool = False self._max_hu_table_value: int = 3000 # TODO: set up max hu value using custom hu table self._simulated_particles_per_bixel = 1e2 os.makedirs(self._plan_dir) os.makedirs(self._regions_dir) super().__init__(pln) @property def save_output(self) -> Union[os.PathLike, bool]: if self._save_output is None: return False return self._save_output @save_output.setter def save_output(self, value: Union[str, int, bool]): try: self._save_output = self._parse_external_calc_property(value) except ValueError as e: raise ValueError("save_output could not be set") from e @property def use_output(self) -> Any: if self._use_output is None: return False return self._use_output @use_output.setter def use_output(self, value: Union[str, int, bool]): try: self._use_output = self._parse_external_calc_property(value) except ValueError as e: raise ValueError("use_output could not be set") from e @property def save_input(self) -> Union[os.PathLike, bool]: if self._save_input is None: return False return self._save_input @save_input.setter def save_input(self, value: Union[os.PathLike, int, bool]): try: self._save_input = self._parse_external_calc_property(value) except ValueError as e: raise ValueError("save_input could not be set") from e def _parse_external_calc_property( self, value: Union[os.PathLike, bool, int] ) -> Union[os.PathLike, None]: """ Parse the external calculation property to ensure it is a valid path or None. Parameters ---------- value : Union[os.PathLike,bool] The value to be parsed. Returns ------- Union[os.PathLike, None] The parsed path or None if the value is not set. """ if isinstance(value, (str, bytes, os.PathLike)) and ( os.path.isdir(value) or os.path.isdir(os.path.dirname(value)) ): return Path(value) elif isinstance(value, (int, bool)): value = bool(value) if value: return Path(os.getcwd()) else: return None else: raise ValueError("Invalid value. Must be a valid path or boolean.") def _get_path(self, subfolder: os.PathLike, filename: os.PathLike) -> Path: """ Construct the full path to a file in a specified subfolder. Parameters ---------- subfolder : str The subfolder name (e.g., 'inp', 'plan', 'regions', 'out'). filename : str The name of the file. Returns ------- str The full path to the file. Raises ------ ValueError If the subfolder is invalid. """ if subfolder not in ["inp", "plan", "regions", "out"]: raise ValueError("Invalid subfolder. Choose from 'inp', 'plan', or 'regions'.") base_path = self._inp_dir if subfolder == "inp" else getattr(self, f"_{subfolder}_dir") return Path(os.path.join(base_path, filename)) def _write_run_file(self) -> None: """Write main run file for the FRED engine.""" file_content = textwrap.dedent(""" include: regions/regions.inp include: plan/planDelivery.inp """) try: file_path = self._get_path(self._input_foldername, self._run_input_filename) with open(file_path, "w") as f: f.write(file_content) # TODO: better way to handle different versions! 3.70.0 does not support dijformatversion if ( self._dij_format_version is not None and not self._calc_dose_direct and self.fred_version == "3.76.0" ): f.write(f"ijFormatVersion= {self._dij_format_version}\n") logger.info(f"File written: {file_path}") except PermissionError: logger.warning( f"Permission denied when trying to write to {self._run_input_filename}." ) except IOError as e: logger.warning( f"An error occurred while writing to {self._run_input_filename}. Error: {e}" ) except Exception as e: logger.warning(f"An unexpected error occurred: {e}") def _write_regions_file(self) -> None: """Write the regions file for the FRED engine, defining the phantom and room regions.""" try: file_path = self._get_path(self._regions_foldername, self._regions_filename) with open(file_path, "w") as f: f.write("region<\n") f.write("\tID=Phantom\n") f.write("\tCTscan=regions/{}\n".format(self._patient_filename)) f.write("\tO=[{:.2f},{:.2f},{:.2f}]\n".format(0, 0, 0)) f.write("\tpivot=[0.5,0.5,0.5]\n") # l=e1 u=e2 # x in Room coordinates is x in patient frame # y in Romm coordinates is -y in patient frame # Voxels in y-direction in matRad grow in -y direction in FRED Room reference f.write("\tl=[{:.2f},{:.2f},{:.2f}]\n".format(1, 0, 0)) f.write("\tu=[{:.2f},{:.2f},{:.2f}]\n".format(0, -1, 0)) # Syntax changes for scorers according to direct or ij calculation if self._calc_dose_direct: f.write("\tscore=[") else: f.write("\tscoreij=[") if len(self.scorers) > 1: for scorer in self.scorers: f.write("{},".format(scorer)) f.write("{}]\n".format(self.scorers[-1])) # Write Room parameters f.write("region>\n") f.write("region<\n") f.write("\tID=Room\n") f.write("\tmaterial={}\n".format(self.room_material)) f.write("region>\n") f.write("include: regions/{}\n".format(self.HU_table_file)) if self._HU_clamping: f.write("lAllowHUClamping=t\n") logger.info(f"File written: {file_path}") except PermissionError: logger.warning( f"Permission denied when trying to write to {self._run_input_filename}." ) except IOError as e: logger.warning( f"An error occurred while writing to {self._run_input_filename}. Error: {e}" ) except Exception as e: logger.warning(f"An unexpected error occurred: {e}") def _set_up_patient_ct(self, ct: CT) -> None: """ Set up the patient CT data for the FRED engine by resampling and saving it. Parameters ---------- ct : CT The CT object containing the patient data. """ file_path = self._get_path(self._regions_foldername, self._patient_filename) resampled_ct = resample_ct( ct=ct, interpolator=sitk.sitkNearestNeighbor, target_grid=self.dose_grid, ) resampled_ct.cube_hu = sitk.Cast(resampled_ct.cube_hu, sitk.sitkInt16) # Force Origin to (0,0,0) and Direction to Identity to match matRad behavior resampled_ct.cube_hu.SetOrigin((0.0, 0.0, 0.0)) resampled_ct.cube_hu.SetDirection((1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)) sitk.WriteImage(resampled_ct.cube_hu, file_path) max_ct_hu_value = sitk.GetArrayFromImage(resampled_ct.cube_hu).max() if max_ct_hu_value > self._max_hu_table_value: self._HU_clamping = True logger.warning(("HU outside of boundaries")) def _set_up_hu_table(self) -> None: """Write the HU table file for the FRED engine, defining the mapping of HU values to material properties.""" file_content = textwrap.dedent(""" matColumns: HU rho RSP Ipot Lrad C Ca H N O P Ti S mat: -1024 0.001 0.001 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: -999 0.001 0.0011 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: -90 0.95 0.95 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: -45 0.99 0.99 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: 0 1 1 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: 100 1.095 1.095 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: 350 1.199 1.199 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 mat: 3000 2.505 2.505 78.0 36.1 0 0 11.189400 0 88.810600 0 0 0 """) try: file_path = self._get_path(self._regions_foldername, self.HU_table_file) with open(file_path, "w") as f: f.write(file_content) except PermissionError: logger.warning( f"Permission denied when trying to write to {self._run_input_filename}." ) except IOError as e: logger.warning( f"An error occurred while writing to {self._run_input_filename}. Error: {e}" ) except Exception as e: logger.warning(f"An unexpected error occurred: {e}") def _write_plan_delivery_file(self) -> None: """Write the plan delivery file for the FRED engine, defining the delivery of pencil beams.""" if self.source_model == "gaussian": layer_case_parameters = """def: currFWHM = layer.get('FWHM')""" beamlet_case_parameters = """Xsec = gauss FWHM = $currFWHM """ elif self.source_model == "emittance": layer_case_parameters = """def: currEmittanceX = layer.get(emittanceX) def: currTwissAlphaX = layer.get(twissAlphaX) def: currTwissBetaX = layer.get(twissBetaX) def: currReferencePlane = layer.get(emittanceRefPlaneDistance) """ beamlet_case_parameters = """Xsec = emittance emittanceX = $currEmittanceX twissAlphaX = $currTwissAlphaX twissBetaX = $currTwissBetaX emittanceRefPlaneDistance = 100 """ elif self.source_model == "sigmaSqrModel": layer_case_parameters = """def: currSQr_a = layer.get(sSQr_a) def: currSQr_b = layer.get(sSQr_b) def: currSQr_c = layer.get(sSQr_c) """ beamlet_case_parameters = """Xsec = emittance sigmaSqrModel = [${plan.get(" "SAD" ")},${currSQr_a},${currSQr_b}, ${currSQr_c}] """ if self._machine.radiation_mode == "protons": particle = """proton""" elif self._machine.radiation_mode == "carbon": particle = """C12""" file_content = textwrap.dedent(f""" #Include file defining fields and layers geometry include: plan/plan.inp #Define the fields for(currField in plan.get('Fields'))< field< ID = ${{currField.get('fieldNumber')}} O = [0,${{plan.get('SAD')}},0] L = ${{currField.get('dim')}} pivot = [0.5,0.5,0.5] l = [0, 0, -1] u = [1, 0 ,0] field> #Deactivate the fields to avoid geometrical overlap deactivate: field_${{currField.get('fieldNumber')}} for> for(currField in plan.get('Fields'))< def: fieldIdx = currField.get('fieldNumber') # Activate current field activate: field_$fieldIdx #Collect Gantry and Couch angles def: GA = currField.get('GA') def: CA = currField.get('CA') #Collect Isocenter def: ISO = currField.get('ISO') #First move the patient so that the Isocenter is now in the center of the Room coordinate system transform: Phantom move_to ${{ISO.item(0)}} ${{ISO.item(1)}} ${{ISO.item(2)}} Room #Second rotate the patient according to the gantry and couch angles. transform: Phantom rotate y ${{CA}} Room transform: Phantom rotate z ${{GA}} Room for(layer in currField.get('Layers'))< #Recover parameters of the current energy layer def: currEnergy = layer.get('Energy') def: currEspread = layer.get('Espread') {layer_case_parameters} for(beamlet in layer.get('beamlets'))< pb< ID = ${{beamlet.get('beamletID')}} fieldID = $fieldIdx particle = {particle} T = $currEnergy EFWHM = $currEspread {beamlet_case_parameters} P = ${{beamlet.get('P')}} v = ${{beamlet.get('v')}} N = ${{beamlet.get('w')}} pb> for> for> #Deliver all the pencil beams in this field deliver: field_$fieldIdx #Deactivate the current field deactivate: field_$fieldIdx #Restore the patient to original position transform: Phantom rotate z ${{-1*GA}} Room transform: Phantom rotate y ${{-1*CA}} Room transform: Phantom move_to 0 0 0 Room for> """) try: file_path = self._get_path(self._plan_foldername, self._plan_delivery_filename) with open(file_path, "w") as file: file.write(file_content) logger.info(f"File written: {file_path}") except PermissionError: logger.warning( f"Warning: Permission denied when trying to write to {self._plan_delivery_filename}." ) except IOError as e: logger.warning( f"Warning: An error occurred while writing to {self._plan_delivery_filename}. Error: {e}" ) except Exception as e: logger.warning(f"An unexpected error occurred: {e}") def _write_plan_file(self, beams_fred: list[dict]) -> None: """ Write the plan file for the FRED engine, defining the fields, layers, and beamlets. Parameters ---------- beams_fred : list[dict] A list of dictionaries containing the beam data for the FRED engine. """ try: file_path = self._get_path(self._plan_foldername, self._plan_filename) with open(file_path, "w") as file: file.write("nprim = {} \n".format(self._simulated_particles_per_bixel)) # loop over fields layer_counter = 0 beamlet_counter = 0 field_counter = 0 for f, field in enumerate(beams_fred): # loop over energy layers num_layer_start_field = layer_counter for ly, layer in enumerate(field["layers"]): file.write("#Beamlets Field {}, Layer {} \n".format(f, ly)) # print bixel(aka beamlet) info (ID, Position, Direction, Weight) num_beamlet_start_layer = beamlet_counter for b, beamlet in enumerate(layer["pb"]): file.write( "\t\t def: S{0} = {{'beamletID': {1}, 'P': [{2}], 'v':[{3}], 'w':{4}}} \n".format( beamlet_counter, beamlet_counter, ", ".join(beamlet["position_bev"]), ", ".join(beamlet["divergence"]), beamlet["weight"], ) ) beamlet_counter = beamlet_counter + 1 beamlets_in_layer = [ ("S" + str(n)) for n in list(range(num_beamlet_start_layer, beamlet_counter)) ] # TODO: print gauss corresponding parameters -> here FWHM file.write( "\t def: L{0} = {{'Energy': {1}, 'Espread': {2}, 'FWHM':{3}, 'beamlets': [{4}]}} \n".format( layer_counter, layer["energy"], layer["ESpread"], layer["FWHM"], ", ".join(beamlets_in_layer), ) ) file.write("\n") layer_counter = layer_counter + 1 layers_in_field = [ ("L" + str(n)) for n in list(range(num_layer_start_field, layer_counter)) ] file.write( "def: F{} = {{'fieldNumber':{}, 'GA': {}, 'CA': {}, 'ISO': [{}], 'dim':[{}], 'Layers': [{}]}} \n".format( f, f, field["GA"], field["CA"], ", ".join(field["ISO"]), ", ".join(field["field_extent"]), ", ".join(layers_in_field), ) ) file.write("\n") field_counter = field_counter + 1 if field_counter == 0: fields = ["F" + str(0)] else: fields = [("F" + str(n)) for n in list(range(field_counter))] file.write( "def: plan = {{ 'SAD': {}, 'Fields': [{}] }}\n".format( field["BAMS_to_iso"], ", ".join(fields) ) ) logger.info(f"File written: {file_path}") except PermissionError: logger.warning( f"Warning: Permission denied when trying to write to {self.plan_filename}." ) except IOError as e: logger.warning( f"Warning: An error occurred while writing to {self.plan_filename}. Error: {e}" ) except Exception as e: logger.warning(f"An unexpected error occurred: {e}") def _transform_to_fred_isocenter(self, dij: dict, iso_center: np.ndarray) -> list[str]: """ Transform the isocenter coordinates from the dose grid to FRED's coordinate system. This function calculates the isocenter position in FRED's coordinate system based on the dose grid resolution, dimensions, and the provided isocenter coordinates. Parameters ---------- dij : dict The dose influence matrix containing dose grid information. iso_center : np.ndarray The isocenter coordinates in the dose grid's world coordinate system. Returns ------- list[str] The transformed isocenter coordinates in FRED's coordinate system. """ dose_grid: Grid = dij["dose_grid"] dose_grid_resolution = np.array( [ dose_grid.resolution["x"], dose_grid.resolution["y"], dose_grid.resolution["z"], ] ) first_vox_world = np.array([dose_grid.x.min(), dose_grid.y.min(), dose_grid.z.min()]) translation = ( dose_grid_resolution - first_vox_world ) # because first_vox_cube = dose_grid_resolution iso_dose_grid_coord = iso_center + translation fred_cube_surface_in_dose_cube_coords = 0.5 * dose_grid_resolution fred_pivot_in_cube_coordinates = 0.5 * ( np.array(dose_grid.dimensions) * dose_grid_resolution ) + np.array(fred_cube_surface_in_dose_cube_coords) fred_iso = -(fred_pivot_in_cube_coordinates - iso_dose_grid_coord) * (-1, 1, 1) return fred_iso def _calc_dose(self, ct: CT, cst: StructureSet, stf: SteeringInformation) -> Dij: """ Perform the dose calculation using the FRED engine. Parameters ---------- ct : CT The CT object containing the patient data. cst : StructureSet The structure set containing the contours and VOIs. stf : SteeringInformation The steering information for the beams. Returns ------- Dij The dose influence matrix containing the calculated dose. """ dij = self._init_dose_calc(ct, cst, stf) counter = 0 self.dose_cube = None self.fred_order = None if not self._calc_dose_direct: counter = 0 for b, beam in enumerate(stf.beams): for j, ray in enumerate(beam.rays): for k in range(len(ray.beamlets)): dij["beam_num"][counter] = b dij["ray_num"][counter] = j dij["bixel_num"][counter] = k counter += 1 if self._use_output is not None: self._process_existing_output(dij, stf) else: self._prepare_new_calculation(dij, ct, stf) if self._use_output is not None: self.fred_order = self._generate_fred_order(dij, stf) if self.dose_cube is None: dij["physical_dose"] = sparse.csc_array( (dij["dose_grid"].num_voxels, stf.total_number_of_bixels), dtype=np.int8 ) else: # Fill dij dij = self._process_dose_cube(dij) self._check_saving_options() logger.info("Finalizing dose calculation...") t_start = time.time() dij = self._finalize_dose(dij) t_end = time.time() logger.info("Done in %f seconds.", t_end - t_start) return dij def _process_existing_output(self, dij: dict, stf: SteeringInformation) -> None: """ Process existing output files if `self._use_output` is set. Parameters ---------- dij : dict The dose influence matrix. stf : SteeringInformation The steering information for the beams. """ self._output_dir = self._use_output self._read_output_files() def _prepare_new_calculation(self, dij: dict, ct: CT, stf: SteeringInformation) -> None: """ Prepare for a new dose calculation by setting up input files and calling FRED. Parameters ---------- dij : dict The dose influence matrix. ct : CT The CT object containing the patient data. stf : SteeringInformation The steering information for the beams. """ self._total_number_of_bixels = stf.total_number_of_bixels self._simulated_particles_per_bixel = ( np.floor(max([1, self.num_histories_direct / self._total_number_of_bixels])) if self._calc_dose_direct else self.num_histories_per_beamlet ) beams_fred = self._generate_beams_fred(dij, stf) self._write_plan_file(beams_fred) self._set_up_files(ct) if self.external_calculation: self.dose_cube = None logger.info((f"Files created for external calculation in dir: {self._inp_dir}")) else: self._call_fred() if not self._calc_dose_direct: self.fred_order = self._generate_fred_order(dij, stf) self._read_output_files() def _generate_fred_order(self, dij: dict, stf: SteeringInformation) -> np.ndarray: """ Generate the FRED order mapping for scorer-ij mode. Parameters ---------- dij : dict The dose influence matrix. stf : SteeringInformation The steering information for the beams. """ counter_fred = 0 fred_order = np.full(dij["total_num_of_bixels"], np.nan) for i, beam in enumerate(stf.beams): for j, (energy_key, energy_layer) in enumerate(beam.energy_layers.items()): for k in range(len(energy_layer["rays_idx"])): ix = np.where( (i == dij["beam_num"]) & np.isin(dij["ray_num"], energy_layer["rays_idx"][k]) & np.isin(dij["bixel_num"], energy_layer["beamlet_idx"][k]) ) fred_order[ix] = counter_fred counter_fred += 1 return fred_order def _generate_beams_fred(self, dij: dict, stf: SteeringInformation) -> list[dict]: """ Generate the FRED-compatible beam data structure. Parameters ---------- dij : dict The dose influence matrix. stf : SteeringInformation The steering information for the beams. Returns ------- list[dict] A list of dictionaries containing the beam data for FRED. """ beams_fred = [] for b, beam in enumerate(stf.beams): fred_iso = self._transform_to_fred_isocenter(dij, beam.iso_center) beams_dict = { "fieldID": b, "origin": beam.source_point_bev / 10, "GA": beam.gantry_angle, "CA": beam.couch_angle, "field_extent": [], "ISO": (str(n / 10) for n in fred_iso), "BAMS_to_iso": cast(IonAccelerator, self._machine).bams_to_iso_dist / 10, "layers": self._generate_layers_fred(beam, b), } fwhm_max = max( [beams_dict["layers"][k]["FWHM"] for k in range(len(beams_dict["layers"]))] ) field_extent = [ray.ray_pos for ray in beam.rays] enclosing_radius_margin = 10 field_extent = max(np.array(np.amax(field_extent, axis=1))) / 10 + 10 * fwhm_max # making sure no sample is smaller than the enclosing radius margin if field_extent <= enclosing_radius_margin: field_extent = enclosing_radius_margin * 2 beams_dict["field_extent"] = [str(field_extent), str(field_extent), str(0.1)] beams_fred.append(beams_dict) return beams_fred def _generate_layers_fred(self, beam: Beam, beam_index: int) -> list[dict]: """ Generate the FRED-compatible layers for a beam. Parameters ---------- beam : Beam The beam object. beam_index : int The index of the beam. Returns ------- list[dict] A list of dictionaries containing the layer data for FRED. """ layers_fred = [] bams_to_iso = cast(IonAccelerator, self._machine).bams_to_iso_dist / 10 # Linear projection of BEV source (x,y) points to plane at BAMStoISO distance def get_point_at_bams(target, source, distance, bams_to_iso): return (target - source) * (-bams_to_iso) / distance + source for key, value in beam.energy_layers.items(): pb_fred = [] for i in range(len(value["beamlet_idx"])): target_point = beam.rays[value["rays_idx"][i]].target_point_bev / 10 # source_point = beam.source_point_bev / 10 #seems to not be needed in FRED ray_position = beam.rays[value["rays_idx"][i]].ray_pos_bev / 10 distance = target_point[1] - ray_position[1] # Project the spot to the entrance plane (BAMStoISO distance) ray_pos_z_proj = get_point_at_bams( target_point[2], ray_position[2], distance, bams_to_iso ) ray_pos_x_proj = get_point_at_bams( target_point[0], ray_position[0], distance, bams_to_iso ) pb_dict = { "ID": i, "fieldID": beam_index, "particle": beam.radiation_mode, "T": value["full_energy"], "position_bev": [str(ray_pos_z_proj), str(ray_pos_x_proj), str(0)], "divergence": [ str((target_point[2] - ray_position[2]) / distance), str((target_point[0] - ray_position[0]) / distance), str(1), ], "weight": beam.rays[value["rays_idx"][i]] .beamlets[value["beamlet_idx"][i]] .weight, "Xsec": "gauss", } if self._calc_dose_direct: pb_dict["weight"] *= self.scaling_factor pb_fred.append(pb_dict) emittance = cast(IonAccelerator, self._machine).foci[value["full_energy"]][0].emittance layer_dict = { "energy": value["full_energy"], "ESpread": cast(IonAccelerator, self._machine).spectra[value["full_energy"]].fwhm / 2.355, "FWHM": ( 2.355 * (2 * emittance.sigma_x * emittance.sigma_y) / (emittance.sigma_x + emittance.sigma_y) ) / 10, "pb": pb_fred, } layers_fred.append(layer_dict) return layers_fred def _process_dose_cube(self, dij: dict) -> None: """ Process the dose cube and update the dose influence matrix. Parameters ---------- dij : dict The dose influence matrix. """ if self._calc_dose_direct: # Direct dose calculation dij["physical_dose"].flat[0] = sparse.csc_array(self.dose_cube.reshape(-1, 1)) if self.calc_let and self.let_cube is not None: dij["mLETd"].flat[0] = sparse.csc_array( ( self.let_cube[self._vdose_grid] / 10, (self._vdose_grid, np.ones(len(self._vdose_grid))), ), shape=(self.dose_grid.num_voxels, 1), ) # LETd * dose dij["mLETDose"].flat[0] = sparse.csc_array( ( (self.let_cube[self._vdose_grid] / 10) * self.dose_cube[self._vdose_grid], (self._vdose_grid, np.ones(len(self._vdose_grid))), ), shape=(self.dose_grid.num_voxels, 1), ) dij_fields_to_override = [ "num_of_beams", "total_number_of_bixels", "total_number_of_rays", "num_of_rays_per_beam", ] for field_name in dij_fields_to_override: dij[field_name] = 1 dij["beam_num"] = 0 dij["bixel_num"] = 0 dij["ray_num"] = 0 else: # Scorer-ij mode self.dose_cube = sparse.csc_array(self.dose_cube) self.dose_cube = self.dose_cube[:, self.fred_order] dij["physical_dose"].flat[0] = self._conversion_factor * self.dose_cube if self.calc_let and self.let_cube is not None: self.let_cube = sparse.csc_array(self.let_cube) self.let_cube = self.let_cube[:, self.fred_order] # Divide by 10, FRED scores in MeV * cm^2 / g dij["mLETd"].flat[0] = self.let_cube / 10 # LETd * dose dij["mLETDose"].flat[0] = dij["physical_dose"].flat[0] * dij["mLETd"].flat[0] if self.calc_bio_dose: logger.warning("Biological dose calculation is not implemented yet.") return dij def _check_saving_options(self) -> None: if self._save_input is not None: if not os.path.isdir(self._save_input): raise ValueError( f"The provided path '{self._save_input}' for 'save_input' is not a valid directory." ) else: shutil.copytree( self._fred_root_folder / "inp", self._save_input / "inp", dirs_exist_ok=True ) if self._save_output is not None: if not os.path.isdir(self._save_output): raise ValueError( f"The provided path '{self._save_output}' for 'save_output' is not a valid directory." ) else: shutil.copytree( self._fred_root_folder / "out", self._save_output / "out", dirs_exist_ok=True ) # TODO: Might implement a method to keep tmp files in the future: shutil.rmtree(self._fred_root_folder) logger.info((f"Temp tree directory deleted: {self._fred_root_folder}")) def _set_up_files(self, ct: CT) -> None: """ Set up the necessary input files for the FRED engine. Parameters ---------- ct : CT The CT object containing the patient data. """ self._set_up_hu_table() self._set_up_patient_ct(ct) self._write_run_file() self._write_regions_file() self._write_plan_delivery_file() def _read_output_files(self) -> None: """Read the output files generated by the FRED engine. Handle both direct and scorer-ij modes.""" dose_cube = None let_cube = None if not self._calc_dose_direct: # Handle scorer-ij mode dose_dij_folder = os.path.join(self._output_dir, "scoreij") dose_dij_file = "Phantom.Dose.bin" load_file_name = os.path.join(dose_dij_folder, dose_dij_file) logger.info(f"Looking for scorer-ij output in sub folder: {dose_dij_folder}") if os.path.isfile(load_file_name): dose_cube = self.read_sparse_dij_bin(load_file_name) else: logger.error(f"Unable to find file: {load_file_name}") if self.calc_let: letd_dij_file = "Phantom.LETd.bin" letd_file_name = os.path.join(dose_dij_folder, letd_dij_file) try: let_cube = self.read_sparse_dij_bin(letd_file_name) except Exception as e: logger.error(f"Unable to load file: {letd_file_name}. Error: {e}") else: # Handle direct dose calculation mode dose_cube_folder = os.path.join(self._output_dir, "score") dose_cube_file_name = "Phantom.Dose.mhd" load_file_name = os.path.join(dose_cube_folder, dose_cube_file_name) logger.info(f"Looking for scorer file in sub folder: {dose_cube_folder}") if os.path.isfile(load_file_name): dose_cube = sitk.GetArrayFromImage(sitk.ReadImage(load_file_name)) else: logger.error(f"Unable to find file: {load_file_name}") if self.calc_let: letd_dij_folder = dose_cube_folder letd_cube_file_name = "Phantom.LETd.mhd" try: let_cube = sitk.GetArrayFromImage( sitk.ReadImage(os.path.join(letd_dij_folder, letd_cube_file_name)) ) except Exception as e: logger.error( f"Unable to load file: {os.path.join(letd_dij_folder, letd_cube_file_name)}. Error: {e}" ) # Store the results for further processing self.dose_cube = dose_cube self.let_cube = let_cube
[docs] def read_sparse_dij_bin(self, f_name: str) -> sparse.csc_array: """ Dispatch method to read a sparse dij binary file based on the dij format version. Parameters ---------- f_name (str): File name to read. Returns ------- sparse.csc_array: Sparse matrix containing the dij data. """ with open(f_name, "rb") as f: file_format_version = str(np.frombuffer(f.read(4), dtype=np.int32)[0]) logger.info(f"File format version: {file_format_version}") self._dij_format_version = file_format_version if file_format_version == "20": return read_sparse_dij_bin_v20(f_name) elif file_format_version == "21": return read_sparse_dij_bin_v21(f_name) elif file_format_version == "31": return read_sparse_dij_bin_v31(f_name) else: raise ValueError(f"Unsupported dij format version: {self._dij_format_version}")
def _allocate_quantity_matrices(self, dij: dict[str, Any], names: list[str]) -> Dij: # Loop over all requested quantities for q_name in names: # Create dij list for each quantity dij[q_name] = np.empty(self.mult_scen.scen_mask.shape, dtype=object) # Loop over all scenarios and preallocate quantity containers # TODO: write test for this for i in range(self.mult_scen.scen_mask.size): # Only if there is a scenario we will allocate if self.mult_scen.scen_mask.flat[i]: if self._calc_dose_direct: dij[q_name].flat[i] = np.zeros( (self.dose_grid.num_voxels, dij["num_of_beams"]), dtype=np.float32 ) # else: # # This could probably be optimized by using direct access to the # # lil_matrix's data structures # # TODO: we could store a single sparse pattern matrix and then only store # # the values for all quantities for better memory management # dij[q_name].flat[i] = lil_matrix( # (self._num_of_columns_dij, self.dose_grid.num_voxels), # dtype=np.float32, # ) self._computed_quantities.append(q_name) return dij def _call_fred(self) -> None: """ Call the FRED executable to perform the dose calculation. Raises ------ SystemExit If the FRED execution fails. """ output_path = os.path.abspath(os.path.join(self._fred_root_folder, self._output_folder)) no_gpu = "" if not self.use_gpu: no_gpu = "-nogpu" try: logger.info("Running Simulation in FRED...") t_start = time.time() execute_cmd = ( f'{self.fred_cmd} -f fred.inp -o "{output_path}" -i "{self._inp_dir}" {no_gpu}' ) logger.info("FRED command: %s", execute_cmd) subprocess.run( execute_cmd, cwd=self._inp_dir, env=os.environ.copy(), shell=True, check=True, capture_output=not self.print_output, ) except subprocess.CalledProcessError as e: stdout_msg = e.stdout.decode(errors="ignore") if e.stdout else "" stderr_msg = e.stderr.decode(errors="ignore") if e.stderr else "" logger.error( "FRED execution failed with return code %s. stdout:\n%s\nstderr:\n%s", e.returncode, stdout_msg, stderr_msg or "No error details available", ) raise SystemExit("Aborting...") except FileNotFoundError as e: logger.error( "FRED executable not found: %s. Ensure 'fred' is in PATH.", e, ) raise SystemExit("Aborting... FRED not found") else: t_end = time.time() logger.info("Done in %f seconds.", t_end - t_start) def _init_dose_calc(self, ct: CT, cst: StructureSet, stf: SteeringInformation) -> None: dij = super()._init_dose_calc(ct, cst, stf) dij = self._allocate_quantity_matrices(dij, ["physical_dose"]) if dij["num_of_scenarios"] > 1: raise NotImplementedError( "Multiple scenarios are not supported for FRED calculations." ) # TODO: Add biomodel support # if hasattr(self, 'bioModel') and isinstance(self.bioModel, matRad_LQLETbasedModel): # self._calc_bio_dose = True # else: # self._calc_bio_dose = False # # Limit RBE calculation to proton models for the time being # if self._calc_bio_dose: # if self.radiation_mode == "protons": # dij = self.load_biological_data(cst, dij) # dij = self._allocate_quantity_matrices(dij, ["mAlphaDose", "mSqrtBetaDose"]) # # Only considering LET-based models # self.calc_let = True # else: # logger.warning( # f"Biological dose calculation not supported for radiation modality: {self.radiation_mode}" # ) # self._calc_bio_dose = False # TODO: Handle constant RBE models # if isinstance(self.bioModel, matRad_ConstantRBE): # dij["RBE"] = self.bioModel.RBE # If LET calculation is enabled if self.calc_let: self.scorers.extend(["LETd"]) # Allocate containers for LET*Dose and dose-weighted LET dij = self._allocate_quantity_matrices(dij, ["mLETDose", "mLETd"]) return dij def _finalize_dose(self, dij: dict) -> None: """ Finalize the dose influence matrix. Pruning the matrix and concatenating the containers to a compressed sparse matrix. Parameters ---------- dij : dict The dose influence matrix. Returns ------- Dij The finalized dose influence matrix. """ if not self.external_calculation: # Loop over all scenarios and remove dose influence for voxels outside of segmentations for i in range(self.mult_scen.scen_mask.size): # Only if there is a scenario we will allocate if self.mult_scen.scen_mask.flat[i]: # Loop over all used quantities for q_name in self._computed_quantities: if not self._calc_dose_direct: tmp_matrix = dij[q_name].flat[i] if self._dij_format_version in {"21", "31"}: if not isinstance(tmp_matrix, sparse.csc_array): tmp_matrix = sparse.csc_array(tmp_matrix) tmp_matrix.eliminate_zeros() shape = dij["dose_grid"].dimensions swapped = swap_orientation_sparse_matrix(tmp_matrix, shape, (0, 1)) dij[q_name].flat[i] = swapped else: if not isinstance(tmp_matrix, sparse.csc_array): tmp_matrix = sparse.csc_array(tmp_matrix) tmp_matrix.eliminate_zeros() dij[q_name].flat[i] = tmp_matrix # dij[q_name].flat[i].data *= self.scaling_factor # if self.keep_rad_depth_cubes and self._rad_depth_cubes: # dij["rad_depth_cubes"] = self._rad_depth_cubes # Call the finalizeDose method from the base class return super()._finalize_dose(dij)
[docs] @staticmethod def is_available(pln, machine): available, msg = True, "" return available, msg
def read_sparse_dij_bin_v20(f_name: str) -> sparse.csc_array: """ Read a sparse dij binary file in a Matlab-like format. Parameters ---------- fName (str): File name to read. Returns ------- sparse.csc_array: Sparse matrix containing the dij data. """ with open(f_name, "rb") as f: # Read header _ = np.frombuffer(f.read(4), dtype=np.int32)[0] # file_for_mat_versio dims = np.frombuffer(f.read(4 * 3), dtype=np.int32) _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # res _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # offset n_components = np.frombuffer(f.read(4), dtype=np.int32)[0] number_of_beamlets = np.frombuffer(f.read(4), dtype=np.int32)[0] # Preallocate lists to collect arrays all_values_list = [] all_voxel_inds_list = [] indptr = [0] beamlet_counter = 0 for _ in range(number_of_beamlets): # Read beamlet header _ = np.frombuffer(f.read(4), dtype=np.int32)[0] # bix_num (unused) num_vox = int(np.frombuffer(f.read(4), dtype=np.int32)[0]) beamlet_counter += 1 # Update indptr indptr.append(int(indptr[-1]) + num_vox) # Read voxel indices curr_voxel_indices = np.frombuffer(f.read(4 * num_vox), dtype=np.uint32) all_voxel_inds_list.append(curr_voxel_indices) # Read the values for the current beamlet tmp_values = np.frombuffer(f.read(4 * num_vox * n_components), dtype=np.float32) if n_components == 2: values = tmp_values[0::2] / tmp_values[1::2] else: values = tmp_values[0::n_components] all_values_list.append(values) # Concatenate the lists into single NumPy arrays all_values = np.concatenate(all_values_list) all_voxel_inds = np.concatenate(all_voxel_inds_list) indptr = np.array(indptr, dtype=np.int64) total_voxels = int(np.prod(dims)) dij_matrix = sparse.csc_array( (all_values, all_voxel_inds, indptr), shape=(total_voxels, beamlet_counter) ) return dij_matrix def read_sparse_dij_bin_v21(f_name: str) -> sparse.csc_array: """ Read a sparse dij binary file in version 2.1 format. Parameters ---------- f_name (str): File name to read. Returns ------- sparse.csc_array: Sparse matrix containing the dij data. """ with open(f_name, "rb") as f: # Read header _ = np.frombuffer(f.read(4), dtype=np.int32)[0] # file_format_version dims = np.frombuffer(f.read(4 * 3), dtype=np.int32) _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # res _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # offset _ = np.frombuffer(f.read(4 * 9), dtype=np.float32) # orientation n_components = np.frombuffer(f.read(4), dtype=np.int32)[0] number_of_beamlets = np.frombuffer(f.read(4), dtype=np.int32)[0] # Preallocate lists to collect arrays all_values_list = [] all_voxel_inds_list = [] indptr = [0] beamlet_counter = 0 for _ in range(number_of_beamlets): # Read beamlet header _ = np.frombuffer(f.read(4), dtype=np.uint32)[0] # bix_num num_vox = int(np.frombuffer(f.read(4), dtype=np.int32)[0]) beamlet_counter += 1 # Update indptr indptr.append(int(indptr[-1]) + num_vox) # Read voxel indices curr_voxel_indices = np.frombuffer(f.read(4 * num_vox), dtype=np.uint32) # Read the values for the current beamlet tmp_values = np.frombuffer(f.read(4 * num_vox * n_components), dtype=np.float32) values_nom = tmp_values[0::n_components] if n_components == 2: values_den = tmp_values[1::n_components] values = values_nom / values_den else: values = values_nom # Permute x and y components in voxel indices ind_y, ind_x, ind_z = np.unravel_index(curr_voxel_indices, dims) permuted_voxel_indices = np.ravel_multi_index( (ind_x, ind_y, ind_z), [dims[1], dims[0], dims[2]] ) all_voxel_inds_list.append(permuted_voxel_indices) all_values_list.append(values) # Concatenate the lists into single NumPy arrays all_values = np.concatenate(all_values_list) all_voxel_inds = np.concatenate(all_voxel_inds_list) indptr = np.array(indptr, dtype=np.int64) total_voxels = int(np.prod(dims)) dij_matrix = sparse.csc_array( (all_values, all_voxel_inds, indptr), shape=(total_voxels, beamlet_counter) ) return dij_matrix def read_sparse_dij_bin_v31(f_name: str) -> sparse.csc_array: """ Read a sparse dij binary file in version 3.1 format. Parameters ---------- f_name (str): File name to read. Returns ------- sparse.csc_array: Sparse matrix containing the dij data. """ with open(f_name, "rb") as f: # Read header _ = np.frombuffer(f.read(4), dtype=np.int32)[0] # file_format_version dims = np.frombuffer(f.read(4 * 3), dtype=np.int32) _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # res _ = np.frombuffer(f.read(4 * 3), dtype=np.float32) # offset _ = np.frombuffer(f.read(4 * 9), dtype=np.float32) # orientation n_components = np.frombuffer(f.read(4), dtype=np.uint32)[0] number_of_beamlets = np.frombuffer(f.read(4), dtype=np.uint32)[0] # Read beamlet metadata all_bixel_meta = np.frombuffer(f.read(4 * 3 * number_of_beamlets), dtype=np.uint32) all_bixel_meta = all_bixel_meta.reshape((number_of_beamlets, 3)) # (PBidx, FID, PBID) # Read component data sizes component_data_sizes = [ np.frombuffer(f.read(4), dtype=np.uint32)[0] for _ in range(n_components) ] # Preallocate lists for data pb_idxs = [] voxel_indices = [] tmp_values = [] # Read data for each component for comp_idx in range(n_components): pb_idxs.append( np.frombuffer(f.read(4 * component_data_sizes[comp_idx]), dtype=np.uint32) ) voxel_indices.append( np.frombuffer(f.read(4 * component_data_sizes[comp_idx]), dtype=np.uint32) ) tmp_values.append( np.frombuffer(f.read(4 * component_data_sizes[comp_idx]), dtype=np.float32) ) # Compute values based on components if n_components > 1: values = tmp_values[0] / tmp_values[1] else: values = tmp_values[0] # Permute x and y components in voxel indices ind_y, ind_x, ind_z = np.unravel_index(voxel_indices[0], dims) permuted_voxel_indices = np.ravel_multi_index( (ind_x, ind_y, ind_z), [dims[1], dims[0], dims[2]] ) # Create sparse matrix total_voxels = int(np.prod(dims)) # Optimization: Construct CSC directly using indptr # We assume the data in the file is sorted by beamlet index (pb_idxs), which is standard. # This avoids the overhead of coo_array -> csc_array conversion. counts = np.bincount(pb_idxs[0], minlength=number_of_beamlets) indptr = np.concatenate(([0], np.cumsum(counts))).astype(np.int32) dij_matrix = sparse.csc_array( (values, permuted_voxel_indices, indptr), shape=(total_voxels, number_of_beamlets), ) return dij_matrix