Source code for bsb.topology.partition

"""
Module for the Partition configuration nodes and its dependencies.
"""

import abc
import functools
import json
import typing
from pathlib import Path

import numpy as np
from voxcell import RegionMap

from .. import config
from ..config import refs, types
from ..exceptions import (
    AllenApiError,
    CfgReferenceError,
    ConfigurationError,
    LayoutError,
    NodeNotFoundError,
)
from ..storage._chunks import Chunk
from ..storage._files import NrrdDependencyNode
from ..storage._util import _cached_file
from ..voxels import VoxelSet
from ._layout import Layout, RhomboidData

if typing.TYPE_CHECKING:  # pragma: nocover
    from ..core import Scaffold


class _backref_property(property):
    def __backref__(self, instance, value):
        instance._region = value


[docs] @config.dynamic( attr_name="type", required=False, default="layer", auto_classmap=True, ) class Partition(abc.ABC): """ Abstract class to describe spatial containers for network pieces. """ scaffold: "Scaffold" name: str = config.attr(key=True)
[docs] @_backref_property def region(self): return self._region
@property def data(self): # The data property is read-only to users, but `_data` is assigned # during the layout process return self._data
[docs] @abc.abstractmethod def volume(self, chunk=None): # pragma: nocover """ Calculate the volume of the partition in μm^3. :param chunk: If given, limit the volume of the partition inside of the chunk. :type chunk: bsb.storage._chunks.Chunk :returns: Volume of the partition (in the chunk) :rtype: float """ pass
[docs] @abc.abstractmethod def surface(self, chunk=None): # pragma: nocover """ Calculate the surface of the partition in μm^2. :param chunk: If given, limit the surface of the partition inside of the chunk. :type chunk: bsb.storage._chunks.Chunk :returns: Surface of the partition (in the chunk) :rtype: float """ pass
[docs] @abc.abstractmethod def to_chunks(self, chunk_size): # pragma: nocover """ Calculate all the chunks this partition occupies when cut into ``chunk_sized`` pieces. :param chunk_size: Size per chunk (in μm). The slicing always starts at [0, 0, 0]. :type chunk_size: numpy.ndarray :returns: Chunks occupied by this partition :rtype: list[bsb.storage._chunks.Chunk] """ pass
[docs] @abc.abstractmethod def chunk_to_voxels(self, chunk): # pragma: nocover """ Voxelize the partition's occupation in this chunk. Required to fill the partition with cells by the placement module. :param chunk: The chunk to calculate voxels for. :type chunk: bsb.storage._chunks.Chunk :returns: The set of voxels that together make up the shape of this partition in this chunk. :rtype: bsb.voxels.VoxelSet """ pass
[docs] def to_voxels(self): """ Voxelize the partition's occupation. :returns: VoxelSet of the Partition sources. :rtype: bsb.voxels.VoxelSet """ chunk_size = self.scaffold.network.chunk_size return VoxelSet.concatenate( *( self.chunk_to_voxels(Chunk(chunk, chunk_size)) for chunk in self.to_chunks(chunk_size) ) )
[docs] @abc.abstractmethod def rotate(self, rotation): # pragma: nocover """ Rotate the partition by the given rotation object. :param rotation: Rotation object. :type rotation: scipy.spatial.transform.Rotation :raises: :class:`.exceptions.LayoutError` if the rotation needs to be rejected. """ pass
[docs] @abc.abstractmethod def translate(self, offset): # pragma: nocover """ Translate the partition by the given offset. :param offset: Offset, XYZ. :type offset: numpy.ndarray :raises: :class:`.exceptions.LayoutError` if the translation needs to be rejected. """ pass
[docs] @abc.abstractmethod def scale(self, factors): # pragma: nocover """ Scale up/down the partition according to the given factors. :param factors: Scaling factors, XYZ. :type factors: numpy.ndarray :raises: :class:`.exceptions.LayoutError` if the scaling needs to be rejected. """ pass
[docs] @abc.abstractmethod def get_layout(self, hint): # pragma: nocover """ Given a Layout as hint to begin from, create a Layout object that describes how this partition would like to be laid out. :param hint: The layout space that this partition should place itself in. :type hint: bsb.topology._layout.Layout :returns: The layout describing the space this partition takes up. :rtype: bsb.topology._layout.Layout """ pass
[docs] @config.node class Rhomboid(Partition, classmap_entry="rhomboid"): """ Rectangular cuboid partition defined according to its origin and dimensions. """ dimensions: list[float] = config.attr(type=types.list(type=float, size=3)) """ Sizes of the partition for each axis. """ can_scale: bool = config.attr(type=bool, default=True) """ Boolean flag to authorize rescaling of the partition dimensions. """ origin: list[float] = config.attr(type=types.list(type=float, size=3)) """ Coordinate of the origin of the partition. """ can_move: bool = config.attr(type=bool, default=True) """ Boolean flag to authorize the translation of the partition. """ orientation: list[float] = config.attr(type=types.list(type=float, size=3)) """ Vector describing the 3D orientation of the partition. """ can_rotate: bool = config.attr(type=bool, default=True) """ Boolean flag to authorize the rotation of the partition. """
[docs] def volume(self, chunk=None): if chunk is not None: # Create an intersection between the partition and the chunk low = np.maximum(self.ldc, chunk.ldc) high = np.minimum(self.mdc, chunk.mdc) return np.prod(np.maximum(high - low, 0)) else: return np.prod(self.data.dimensions)
@property def mdc(self): """ Return the highest coordinate of the partition. """ return self._data.mdc @property def ldc(self): """ Return the lowest coordinate of the partition. """ return self._data.ldc
[docs] def surface(self, chunk=None): if chunk is not None: # Gets the xz "square" from a volume def sq(v): return np.array(v)[[0, 1]] ldc = sq(self.ldc) mdc = sq(self.mdc) cl = sq(chunk.ldc) cm = sq(chunk.mdc) # Create an intersection between the partition and the chunk low = np.maximum(ldc, cl) high = np.minimum(mdc, cm) return np.prod(np.maximum(high - low, 0)) else: return self.data.width * self.data.depth
[docs] def to_chunks(self, chunk_size): # Get the low and high range of the data in chunk coordinates low_r = np.floor(self.ldc / chunk_size).astype(int) high_r = np.ceil(self.mdc / chunk_size).astype(int) # Create a grid that includes all the chunk coordinates within those data coords = np.mgrid[ tuple(range(low, high) for low, high in zip(low_r, high_r, strict=False)) ] # Order the coordinate grid into a list of chunk coordinates. return np.column_stack(tuple(dim.ravel() for dim in coords))
[docs] def chunk_to_voxels(self, chunk): """ Return an approximation of this partition intersected with a chunk as a list of voxels. Default implementation creates a parallelepiped intersection between the LDC, MDC and chunk data. """ low = np.maximum(self.ldc, chunk.ldc) high = np.minimum(self.mdc, chunk.mdc) # Return 0 voxels when the coords are OOB for this partition if np.any(low > high): return VoxelSet.empty() else: return VoxelSet.one(low, high)
[docs] def rotate(self, rot): raise LayoutError("Rotation not implemented yet.")
[docs] def translate(self, translation): self.data.ldc += translation self.data.mdc += translation
[docs] def scale(self, factors): self.data.mdc = self.data.ldc + (self.data.mdc - self.data.ldc) * factors
[docs] def get_dependencies(self): """ Return other partitions or regions that need to be laid out before this. """ return []
[docs] def get_layout(self, hint): dim = ( (hint.data.mdc - hint.data.ldc) if self.dimensions is None else np.array(self.dimensions) ) orig = hint.data.ldc.copy() if self.origin is None else np.array(self.origin) return Layout(RhomboidData(orig, dim + orig), owner=self)
[docs] @config.node class Layer(Rhomboid, classmap_entry="layer"): """ Partition that occupies the full space of its containing region except on a defined axis, where it is limited. This creates a stratum within the region along the chosen axis. """ dimensions = config.unset() thickness: float = config.attr(type=float, required=True) """ Thickness of the layer along its axis. """ axis: typing.Literal["x"] | typing.Literal["y"] | typing.Literal["z"] = config.attr( type=types.in_(["x", "y", "z"]), default="z" ) """ Axis along which the layer will be limited. """
[docs] def get_layout(self, hint): axis = ["x", "y", "z"].index(self.axis) dim = hint.data.mdc - hint.data.ldc dim[axis] = self.thickness orig = hint.data.ldc.copy() if self.origin is None else np.array(self.origin) return Layout(RhomboidData(orig, dim + orig), owner=self)
# TODO: Layer scaling
[docs] @config.node class Voxels(Partition, abc.ABC, classmap_entry=None): """ Partition based on a set of voxels. """ @functools.cached_property def voxelset(self): return self.to_voxels()
[docs] def to_chunks(self, chunk_size): return self.voxelset.snap_to_grid(chunk_size, unique=True)
def _lookup(self, chunk): if not hasattr(self, "_map"): vs = self.voxelset.snap_to_grid(chunk.dimensions) map = {} for i, idx in enumerate(vs): map.setdefault(idx.view(Chunk), []).append(i) self._map = {k: self.voxelset[v] for k, v in map.items()} return self._map
[docs] def chunk_to_voxels(self, chunk): return self._lookup(chunk).get(chunk, VoxelSet.empty())
[docs] def get_layout(self, hint): return Layout(RhomboidData(*self.voxelset.bounds), owner=self)
[docs] def rotate(self, rotation): raise LayoutError("Axis-aligned voxelsets can't be rotated.")
[docs] def scale(self, factor): raise LayoutError("Voxelset scaling not supported.")
[docs] def translate(self, offset): raise LayoutError("Voxelset translation not supported.")
[docs] def surface(self, chunk=None): raise LayoutError("Voxelset surface calculations not supported.")
[docs] def volume(self, chunk=None): vs = self.chunk_to_voxels(chunk) if chunk is not None else self.voxelset return vs.volume
[docs] @config.node class NrrdVoxels(Voxels, classmap_entry="nrrd"): """ Voxel partition whose voxelset is loaded from an NRRD file. By default, it includes all the nonzero voxels in the file, but other masking conditions can be specified. Additionally, data can be associated to each voxel by inclusion of (multiple) source NRRD files. """ mask_source: NrrdDependencyNode = config.ref(refs.vox_dset_ref, reference_only=False) """ Path to the NRRD file containing the volumetric annotation data of the Partition. """ mask_value: int = config.attr(type=int) """ Integer value to filter in mask_source (if it is set, otherwise sources) to create a mask of the voxel set(s) used as input. """ sources: list[NrrdDependencyNode] = config.reflist( refs.vox_dset_ref, reference_only=False ) """ List of paths to NRRD files containing volumetric data to associate to the Partition. """ sparse: bool = config.attr(type=bool, default=True) """ Boolean flag to expect a sparse or dense mask. If the mask selects most voxels, use ``dense``, otherwise use ``sparse``. """ strict: bool = config.attr(type=bool, default=True) """ Boolean flag to check the sources and the mask sizes. When the flag is True, sources and mask should have exactly the same sizes; otherwise, sources sizes should be greater than mask sizes. """ @config.property(type=bool) def mask_only(self): """Flag to indicate no voxel data needs to be stored""" return len(self.sources) == 0 @property def voxel_size(self): """Size of each voxel.""" if getattr(self, "_mask_src", None) is None: self._validate() return self._mask_src[0].voxel_size
[docs] def get_mask(self): """ Get the mask to apply on the sources' data of the partition. :returns: A tuple of arrays, one for each dimension of the mask, containing the indices of the non-zero elements in that dimension. """ mask_shape = self._validate() mask = np.zeros(mask_shape, dtype=bool) if self.sparse: # Use integer (sparse) indexing mask = [np.empty((0,), dtype=int) for i in range(3)] for mask_src in self._mask_src: mask_data = mask_src.get_data().raw new_mask = np.nonzero(self._mask_cond(mask_data)) for i, mask_vector in enumerate(new_mask): mask[i] = np.concatenate((mask[i], mask_vector)) inter = np.unique(mask, axis=1) mask = tuple(inter[i, :] for i in range(3)) else: # Use boolean (dense) indexing for mask_src in self._mask_src: mask_data = mask_src.get_data().raw mask = mask | self._mask_cond(mask_data) mask = np.nonzero(mask) return mask
def _get_source_ref_key(self, ref): # Check `sources` nrrd ref to retrieve the name they will be referred to # during placement # if ref is a string ref to files if isinstance(ref, str): return ref # if ref is a dict to cast, we take the stem of the file elif isinstance(ref["file"], str): return Path(ref["file"]).stem # if ref is already cast, we take the stem of the file elif isinstance(ref, NrrdDependencyNode): return Path(ref.file.uri).stem else: # pragma: nocover raise CfgReferenceError( f"Unexpected reference type for source: {ref} in partition: {self.name}" )
[docs] def to_voxels(self): mask = self.get_mask() voxel_data = None if not self.mask_only: voxel_data = np.empty((len(mask[0]), len(self._src))) for i, source in enumerate(self._src): voxel_data[:, i] = source.get_data().raw[mask] data_keys = [self._get_source_ref_key(ref) for ref in self.sources._reflist] return VoxelSet( np.transpose(mask), self.voxel_size, data=voxel_data, data_keys=data_keys, )
def _validate(self): self._validate_sources() shape = self._validate_source_compat() self._validate_mask_condition() return shape def _validate_sources(self): self._src = self.sources.copy() if self.mask_source is not None: self._mask_src = [self.mask_source] else: self._mask_src = self._src.copy() def _validate_source_compat(self): mask_headers = {s: s.get_header() for s in self._mask_src} source_headers = {s: s.get_header() for s in self._src} all_headers = mask_headers.copy() all_headers.update(source_headers) dim_probs = [(s, d) for s, h in all_headers.items() if (d := h["dimension"]) != 3] if dim_probs: summ = ", ".join(f"'{s}' has {d}" for s, d in dim_probs) raise ConfigurationError(f"NRRD voxels must contain 3D arrays; {summ}") mask_sizes = {s: [*h["sizes"]] for s, h in mask_headers.items()} source_sizes = {s: [*h["sizes"]] for s, h in source_headers.items()} all_sizes = mask_sizes.copy() all_sizes.update(source_sizes) mask_shape = np.maximum.reduce([*mask_sizes.values()]) if self.mask_only: src_shape = mask_shape else: src_shape = np.minimum.reduce([*source_sizes.values()]) first = _repeat_first() # Check for any size mismatch resolutions = [s.voxel_size for s in all_headers] if any(size != first(size) for size in resolutions): raise ConfigurationError( f"NRRD resolution mismatch in `{self.get_node_name()}`: {resolutions}" ) first = _repeat_first() if self.strict and any(size != first(size) for size in all_sizes.values()): raise ConfigurationError( f"NRRD file size mismatch in `{self.get_node_name()}`: {all_sizes}" ) elif np.any(mask_shape > src_shape): raise ConfigurationError( "NRRD mask too big; it may select OOB source voxels:" + f" {mask_shape} > {src_shape}" ) return mask_shape def _validate_mask_condition(self): if self.mask_value: self._mask_cond = lambda data: data == self.mask_value else: self._mask_cond = lambda data: data != 0
[docs] @config.node class AllenStructure(NrrdVoxels, classmap_entry="allen"): """ Partition based on the Allen Institute for Brain Science mouse brain region ontology, later referred as Allen Mouse Brain Region Hierarchy (AMBRH) """ struct_id: int = config.attr( type=types.int(min=1), # ids are positive and 0 is the outside of the brain required=types.mut_excl("struct_id", "struct_name", required=False), ) """ Id of the region to filter within the annotation volume according to the AMBRH. If struct_id is set, then struct_name should not be set. """ struct_name: str = config.attr( type=types.str(strip=True, lower=True), required=types.mut_excl("struct_id", "struct_name", required=False), key=True, ) """ Name or acronym of the region to filter within the annotation volume according to the AMBRH. If struct_name is set, then struct_id should not be set. """ @classmethod @functools.cache def _dl_mask(cls): node = NrrdDependencyNode() node._file = _cached_file( "https://download.alleninstitute.org/informatics-archive/current-release/mouse_ccf/annotation/ccf_2017/annotation_25.nrrd", ) return node @classmethod @functools.cache def _dl_structure_ontology(cls): content = _cached_file( "http://api.brain-map.org/api/v2/structure_graph_download/1.json" ).get_content()[0] try: return json.loads(content)["msg"] except json.decoder.JSONDecodeError as json_error: # pragma: nocover raise AllenApiError( "Could not parse the Allen mouse brain region hierarchy, " "most likely because the Allen API website is down. \n" "Here is the content retrieved: \n" f"{content}" ) from json_error @functools.cached_property def region_map(self): """ Return RegionMap instance to manipulate the Allen brain region hierarchy. :rtype: voxcell.region_map.RegionMap """ return RegionMap.from_dict(AllenStructure._dl_structure_ontology()[0]) @functools.cached_property def annotations(self): """ Return VoxelData instance of the Allen annotations. :rtype: voxcell.voxel_data.VoxelData """ return self.mask_source.load_object()
[docs] @classmethod def get_structure_mask_condition(cls, find): """ Return a lambda that when applied to the mask data, returns a mask that delineates the Allen structure. :param find: Acronym, Name or ID of the Allen structure. :type find: str | int :returns: Masking lambda :rtype: Callable[numpy.ndarray] """ mask = cls.get_structure_idset(find) if len(mask) > 1: return lambda data: np.isin(data, mask) else: mask0 = mask[0] return lambda data: data == mask0
[docs] @classmethod def get_structure_mask(cls, find): """ Returns the mask data delineated by the Allen mouse brain region. :param find: Acronym, Name or ID of the Allen structure. :type find: str | int :returns: A boolean of the mask filtered based on the Allen structure. :rtype: Callable[numpy.ndarray] """ mask_data = cls._dl_mask().load_object().raw return cls.get_structure_mask_condition(find)(mask_data)
[docs] @classmethod def get_structure_idset(cls, find): """ Return the set of IDs that make up the requested Allen structure. :param find: Acronym or ID of the Allen structure. :type find: str | int :returns: Set of IDs :rtype: numpy.ndarray :raises: NodeNotFoundError """ region_map = cls().region_map if isinstance(find, str): id_roi = list( region_map.find( find, attr="name", ignore_case=True, with_descendants=True ) ) if len(id_roi) == 0: id_roi = list( region_map.find( find, attr="acronym", ignore_case=True, with_descendants=True ) ) elif isinstance(find, int | float): id_roi = list(region_map.find(int(find), attr="id", with_descendants=True)) else: raise TypeError(f"Argument must be a string or a number. {type(find)} given.") if len(id_roi) == 0: raise NodeNotFoundError("Could not find a node that satisfies constraints.") return np.array(id_roi)
def _validate(self): # If neither sources nor mask_source were provided, # use allen ccfv3 annotation volume if self.mask_source is None: self.mask_source = self._dl_mask() return super()._validate() def _validate_mask_condition(self): # We override the `NrrdVoxels`' `_validate_mask_condition` and use this # function as a hook to find and set the mask condition to select every voxel that # has an id that is part of the structure. id = self.struct_id if self.struct_id is not None else self.struct_name self._mask_cond = self.get_structure_mask_condition(id)
def _repeat_first(): _set = False first = None def repeater(val): nonlocal _set, first if not _set: first, _set = val, True return first return repeater __all__ = ["AllenStructure", "Layer", "NrrdVoxels", "Partition", "Rhomboid", "Voxels"]