Source code for pyRadPlan.util.helpers

"""Test Helper functions for pyRadPlan."""

from typing import Union, get_type_hints, get_origin, get_args
import warnings
import numpy as np
from pydantic import BaseModel
from pydantic.fields import FieldInfo, ComputedFieldInfo
from pyRadPlan.core import PyRadPlanBaseModel
import scipy.sparse as sp


[docs] def dl2ld(dict_of_lists: dict[str, list], type_check: bool = True) -> list[dict]: """Convert a dictionary of lists to a list of dictionaries. Parameters ---------- dict_of_lists : dict The dictionary of lists to convert. type_check : bool, optional Whether to perform type checking, by default True Returns ------- list[dict] A list of dictionaries. """ if type_check: # Check if dict_of_lists is a dictionary if not isinstance(dict_of_lists, dict): raise TypeError("The input must be a dictionary.") # Check if all values in the dictionary are lists if not all(isinstance(value, list) for value in dict_of_lists.values()): raise TypeError("All values in the dictionary must be lists.") # CHeck if all lists have the same length if len(set(map(len, dict_of_lists.values()))) > 1: raise TypeError("All lists in the dictionary must have the same length.") # Empty input returns empty array if dict_of_lists == {}: return [] # Convert the dictionary of lists to a list of dictionaries list_of_dicts = [dict(zip(dict_of_lists, t)) for t in zip(*dict_of_lists.values())] return list_of_dicts
[docs] def ld2dl(list_of_dicts: list[dict], type_check: bool = True) -> dict[str, list]: """Convert a list of dictionaries to a dictionary of lists. Parameters ---------- list_of_dicts : list The list of dictionaries to convert. type_check : bool, optional Whether to perform type checking, by default True Returns ------- dict A dictionary of lists. """ if type_check: # Check if list_of_dicts is a list if not isinstance(list_of_dicts, list): raise TypeError("The input must be a list.") # Check if all elements in the list are dictionaries if not all(isinstance(element, dict) for element in list_of_dicts): raise TypeError("All elements in the list must be dictionaries.") # Empty input returns empty array if len(list_of_dicts) == 0: return {} # Convert the list of dictionaries to a dictionary of lists dict_of_lists = {key: [element[key] for element in list_of_dicts] for key in list_of_dicts[0]} return dict_of_lists
[docs] def models2recarray( models: list[BaseModel], serialization_context: Union[dict, str] = None, override_types: dict = None, by_alias: bool = False, ) -> np.recarray: """Convert a list of PyRadPlanBaseModel instances to a numpy recarray. Parameters ---------- models : list The list of PyRadPlanBaseModel instances to convert. serialization_context : str, optional The context in which the datastructure should be serialized, by default None. override_types : dict, optional A dictionary of types to override the automatically obtained types, by default None. If the context contains "matRad" and the models are of type PyRadPlanBaseModel, the models will be converted to matRad compatible structures. by_alias : bool, optional Whether to use the alias names for serialization, by default False. Returns ------- np.recarray A numpy recarray. """ # Check if models is a list if not isinstance(models, list): raise TypeError("The input must be a list.") # Check if models is not empty if not models: raise ValueError("The input list must not be empty.") first_model_type = type(models[0]) # Check if the type is a subclass of a BaseModel if not issubclass(first_model_type, BaseModel): raise TypeError("All elements in the list must be subclasses of BaseModel.") # Check if all modelss in the list are of the same type if not all(isinstance(model, first_model_type) for model in models): raise TypeError("All models in the list must be of the same type.") if "matRad" in serialization_context and isinstance(models[0], PyRadPlanBaseModel): models_dump = [model.to_matrad(context=serialization_context) for model in models] by_alias = True else: models_dump = [ model.model_dump(by_alias=by_alias, context=serialization_context) for model in models ] model_fields = models[0].model_fields | models[0].model_computed_fields model_field_types = get_type_hints(models[0].__class__) for cfield in models[0].model_computed_fields: model_field_types.update({cfield: model_fields[cfield].return_type}) # Debug if "machine" in model_fields: pass # Type management # Override manual types if override_types: model_field_types.update(override_types) # Remove model_field and model_field_types where the override_types is None model_fields = { field: model_fields[field] for field in model_fields if model_field_types[field] is not None } model_field_types = { field: model_field_types[field] for field in model_field_types if model_field_types[field] is not None } # convert dtypes for field in model_field_types: # str -> 'U' if model_field_types[field] is str: model_field_types[field] = object # list management if get_origin(model_field_types[field]) is list: model_field_types[field] = np.ndarray # manage optional types if get_origin(model_field_types[field]) is Union: t_args = get_args(model_field_types[field]) if type(None) in t_args and len(t_args) == 2: model_field_types[field] = t_args[0] else: warnings.warn( f"Field {field} has a Union type that is not supported. Will use object." ) model_field_types[field] = object aliases = {} for field in model_fields: if by_alias: if isinstance(model_fields[field], FieldInfo): aliases[field] = model_fields[field].serialization_alias elif isinstance(model_fields[field], ComputedFieldInfo): aliases[field] = model_fields[field].alias else: # Sanity Check raise TypeError(f"Field {field} is not a FieldInfo or ComputedFieldInfo object.") else: aliases[field] = field # Create a dtype for the structured armodel models_dtype = np.dtype([(aliases[field], model_field_types[field]) for field in model_fields]) models_recarray = np.recarray((len(models),), dtype=models_dtype) for i, model_dict in enumerate(models_dump): for field in model_fields: fname = aliases[field] value = model_dict[fname] if model_field_types[field] == np.str_: value = np.str_(value) elif model_field_types[field] == np.ndarray: value = np.asarray(value) models_recarray[fname][i] = value return models_recarray
[docs] def swap_orientation_sparse_matrix( sparse_matrix: sp.csc_matrix, original_shape, axes ) -> sp.csc_matrix: """ Swap the specified axes of a sparse matrix. Parameters ---------- sparse_matrix : sp.csc_matrix The sparse matrix to swap axes. original_shape : tuple The original shape of the matrix. axes : tuple The axes to swap. Returns ------- sp.csc_matrix The sparse matrix with swapped axes. """ row_indices = sparse_matrix.indices if axes in ((0, 1), (1, 0)): j, i, k = np.unravel_index(row_indices, original_shape) new_shape = (original_shape[1], original_shape[0], original_shape[2]) elif axes in ((0, 2), (2, 0)): k, j, i = np.unravel_index(row_indices, original_shape) new_shape = (original_shape[2], original_shape[1], original_shape[0]) elif axes in ((1, 2), (2, 1)): i, k, j = np.unravel_index(row_indices, original_shape) new_shape = (original_shape[0], original_shape[2], original_shape[1]) else: raise ValueError("Invalid axes for swapping") new_indices = np.ravel_multi_index((i, j, k), new_shape) # Create a new matrix with the new indices # We copy the data and indptr, but replace indices. # Then we must sort indices because the permutation might have disordered them within columns. reordered_sparse_matrix = sparse_matrix.__class__( (sparse_matrix.data, new_indices, sparse_matrix.indptr), shape=sparse_matrix.shape ) reordered_sparse_matrix.sort_indices() return reordered_sparse_matrix