"""
Module for the Partition configuration nodes and its dependencies.
"""
import abc
import functools
import json
import typing
from pathlib import Path
import numpy as np
from voxcell import RegionMap
from .. import config
from ..config import refs, types
from ..exceptions import (
AllenApiError,
CfgReferenceError,
ConfigurationError,
LayoutError,
NodeNotFoundError,
)
from ..storage._chunks import Chunk
from ..storage._files import NrrdDependencyNode
from ..storage._util import _cached_file
from ..voxels import VoxelSet
from ._layout import Layout, RhomboidData
if typing.TYPE_CHECKING: # pragma: nocover
from ..core import Scaffold
class _backref_property(property):
def __backref__(self, instance, value):
instance._region = value
[docs]
@config.dynamic(
attr_name="type",
required=False,
default="layer",
auto_classmap=True,
)
class Partition(abc.ABC):
"""
Abstract class to describe spatial containers for network pieces.
"""
scaffold: "Scaffold"
name: str = config.attr(key=True)
[docs]
@_backref_property
def region(self):
return self._region
@property
def data(self):
# The data property is read-only to users, but `_data` is assigned
# during the layout process
return self._data
[docs]
@abc.abstractmethod
def volume(self, chunk=None): # pragma: nocover
"""
Calculate the volume of the partition in μm^3.
:param chunk: If given, limit the volume of the partition inside of the chunk.
:type chunk: bsb.storage._chunks.Chunk
:returns: Volume of the partition (in the chunk)
:rtype: float
"""
pass
[docs]
@abc.abstractmethod
def surface(self, chunk=None): # pragma: nocover
"""
Calculate the surface of the partition in μm^2.
:param chunk: If given, limit the surface of the partition inside of the chunk.
:type chunk: bsb.storage._chunks.Chunk
:returns: Surface of the partition (in the chunk)
:rtype: float
"""
pass
[docs]
@abc.abstractmethod
def to_chunks(self, chunk_size): # pragma: nocover
"""
Calculate all the chunks this partition occupies when cut into ``chunk_sized``
pieces.
:param chunk_size: Size per chunk (in μm). The slicing always starts at [0, 0, 0].
:type chunk_size: numpy.ndarray
:returns: Chunks occupied by this partition
:rtype: list[bsb.storage._chunks.Chunk]
"""
pass
[docs]
@abc.abstractmethod
def chunk_to_voxels(self, chunk): # pragma: nocover
"""
Voxelize the partition's occupation in this chunk. Required to fill the partition
with cells by the placement module.
:param chunk: The chunk to calculate voxels for.
:type chunk: bsb.storage._chunks.Chunk
:returns: The set of voxels that together make up the shape of this partition in
this chunk.
:rtype: bsb.voxels.VoxelSet
"""
pass
[docs]
def to_voxels(self):
"""
Voxelize the partition's occupation.
:returns: VoxelSet of the Partition sources.
:rtype: bsb.voxels.VoxelSet
"""
chunk_size = self.scaffold.network.chunk_size
return VoxelSet.concatenate(
*(
self.chunk_to_voxels(Chunk(chunk, chunk_size))
for chunk in self.to_chunks(chunk_size)
)
)
[docs]
@abc.abstractmethod
def rotate(self, rotation): # pragma: nocover
"""
Rotate the partition by the given rotation object.
:param rotation: Rotation object.
:type rotation: scipy.spatial.transform.Rotation
:raises: :class:`.exceptions.LayoutError` if the rotation needs to be rejected.
"""
pass
[docs]
@abc.abstractmethod
def translate(self, offset): # pragma: nocover
"""
Translate the partition by the given offset.
:param offset: Offset, XYZ.
:type offset: numpy.ndarray
:raises: :class:`.exceptions.LayoutError` if the translation needs to be rejected.
"""
pass
[docs]
@abc.abstractmethod
def scale(self, factors): # pragma: nocover
"""
Scale up/down the partition according to the given factors.
:param factors: Scaling factors, XYZ.
:type factors: numpy.ndarray
:raises: :class:`.exceptions.LayoutError` if the scaling needs to be rejected.
"""
pass
[docs]
@abc.abstractmethod
def get_layout(self, hint): # pragma: nocover
"""
Given a Layout as hint to begin from, create a Layout object that describes how
this partition would like to be laid out.
:param hint: The layout space that this partition should place itself in.
:type hint: bsb.topology._layout.Layout
:returns: The layout describing the space this partition takes up.
:rtype: bsb.topology._layout.Layout
"""
pass
[docs]
@config.node
class Rhomboid(Partition, classmap_entry="rhomboid"):
"""
Rectangular cuboid partition defined according to its origin and dimensions.
"""
dimensions: list[float] = config.attr(type=types.list(type=float, size=3))
"""
Sizes of the partition for each axis.
"""
can_scale: bool = config.attr(type=bool, default=True)
"""
Boolean flag to authorize rescaling of the partition dimensions.
"""
origin: list[float] = config.attr(type=types.list(type=float, size=3))
"""
Coordinate of the origin of the partition.
"""
can_move: bool = config.attr(type=bool, default=True)
"""
Boolean flag to authorize the translation of the partition.
"""
orientation: list[float] = config.attr(type=types.list(type=float, size=3))
"""
Vector describing the 3D orientation of the partition.
"""
can_rotate: bool = config.attr(type=bool, default=True)
"""
Boolean flag to authorize the rotation of the partition.
"""
[docs]
def volume(self, chunk=None):
if chunk is not None:
# Create an intersection between the partition and the chunk
low = np.maximum(self.ldc, chunk.ldc)
high = np.minimum(self.mdc, chunk.mdc)
return np.prod(np.maximum(high - low, 0))
else:
return np.prod(self.data.dimensions)
@property
def mdc(self):
"""
Return the highest coordinate of the partition.
"""
return self._data.mdc
@property
def ldc(self):
"""
Return the lowest coordinate of the partition.
"""
return self._data.ldc
[docs]
def surface(self, chunk=None):
if chunk is not None:
# Gets the xz "square" from a volume
def sq(v):
return np.array(v)[[0, 1]]
ldc = sq(self.ldc)
mdc = sq(self.mdc)
cl = sq(chunk.ldc)
cm = sq(chunk.mdc)
# Create an intersection between the partition and the chunk
low = np.maximum(ldc, cl)
high = np.minimum(mdc, cm)
return np.prod(np.maximum(high - low, 0))
else:
return self.data.width * self.data.depth
[docs]
def to_chunks(self, chunk_size):
# Get the low and high range of the data in chunk coordinates
low_r = np.floor(self.ldc / chunk_size).astype(int)
high_r = np.ceil(self.mdc / chunk_size).astype(int)
# Create a grid that includes all the chunk coordinates within those data
coords = np.mgrid[
tuple(range(low, high) for low, high in zip(low_r, high_r, strict=False))
]
# Order the coordinate grid into a list of chunk coordinates.
return np.column_stack(tuple(dim.ravel() for dim in coords))
[docs]
def chunk_to_voxels(self, chunk):
"""
Return an approximation of this partition intersected with a chunk as a list of
voxels.
Default implementation creates a parallelepiped intersection between the LDC, MDC
and chunk data.
"""
low = np.maximum(self.ldc, chunk.ldc)
high = np.minimum(self.mdc, chunk.mdc)
# Return 0 voxels when the coords are OOB for this partition
if np.any(low > high):
return VoxelSet.empty()
else:
return VoxelSet.one(low, high)
[docs]
def rotate(self, rot):
raise LayoutError("Rotation not implemented yet.")
[docs]
def translate(self, translation):
self.data.ldc += translation
self.data.mdc += translation
[docs]
def scale(self, factors):
self.data.mdc = self.data.ldc + (self.data.mdc - self.data.ldc) * factors
[docs]
def get_dependencies(self):
"""
Return other partitions or regions that need to be laid out before this.
"""
return []
[docs]
def get_layout(self, hint):
dim = (
(hint.data.mdc - hint.data.ldc)
if self.dimensions is None
else np.array(self.dimensions)
)
orig = hint.data.ldc.copy() if self.origin is None else np.array(self.origin)
return Layout(RhomboidData(orig, dim + orig), owner=self)
[docs]
@config.node
class Layer(Rhomboid, classmap_entry="layer"):
"""
Partition that occupies the full space of its containing region except on a defined
axis, where it is limited.
This creates a stratum within the region along the chosen axis.
"""
dimensions = config.unset()
thickness: float = config.attr(type=float, required=True)
"""
Thickness of the layer along its axis.
"""
axis: typing.Literal["x"] | typing.Literal["y"] | typing.Literal["z"] = config.attr(
type=types.in_(["x", "y", "z"]), default="z"
)
"""
Axis along which the layer will be limited.
"""
[docs]
def get_layout(self, hint):
axis = ["x", "y", "z"].index(self.axis)
dim = hint.data.mdc - hint.data.ldc
dim[axis] = self.thickness
orig = hint.data.ldc.copy() if self.origin is None else np.array(self.origin)
return Layout(RhomboidData(orig, dim + orig), owner=self)
# TODO: Layer scaling
[docs]
@config.node
class Voxels(Partition, abc.ABC, classmap_entry=None):
"""
Partition based on a set of voxels.
"""
@functools.cached_property
def voxelset(self):
return self.to_voxels()
[docs]
def to_chunks(self, chunk_size):
return self.voxelset.snap_to_grid(chunk_size, unique=True)
def _lookup(self, chunk):
if not hasattr(self, "_map"):
vs = self.voxelset.snap_to_grid(chunk.dimensions)
map = {}
for i, idx in enumerate(vs):
map.setdefault(idx.view(Chunk), []).append(i)
self._map = {k: self.voxelset[v] for k, v in map.items()}
return self._map
[docs]
def chunk_to_voxels(self, chunk):
return self._lookup(chunk).get(chunk, VoxelSet.empty())
[docs]
def get_layout(self, hint):
return Layout(RhomboidData(*self.voxelset.bounds), owner=self)
[docs]
def rotate(self, rotation):
raise LayoutError("Axis-aligned voxelsets can't be rotated.")
[docs]
def scale(self, factor):
raise LayoutError("Voxelset scaling not supported.")
[docs]
def translate(self, offset):
raise LayoutError("Voxelset translation not supported.")
[docs]
def surface(self, chunk=None):
raise LayoutError("Voxelset surface calculations not supported.")
[docs]
def volume(self, chunk=None):
vs = self.chunk_to_voxels(chunk) if chunk is not None else self.voxelset
return vs.volume
[docs]
@config.node
class NrrdVoxels(Voxels, classmap_entry="nrrd"):
"""
Voxel partition whose voxelset is loaded from an NRRD file.
By default, it includes all the nonzero voxels in the file, but other masking
conditions can be specified. Additionally, data can be associated to each voxel by
inclusion of (multiple) source NRRD files.
"""
mask_source: NrrdDependencyNode = config.ref(refs.vox_dset_ref, reference_only=False)
"""
Path to the NRRD file containing the volumetric annotation data of the Partition.
"""
mask_value: int = config.attr(type=int)
"""
Integer value to filter in mask_source (if it is set, otherwise sources) to
create a mask of the voxel set(s) used as input.
"""
sources: list[NrrdDependencyNode] = config.reflist(
refs.vox_dset_ref, reference_only=False
)
"""
List of paths to NRRD files containing volumetric data to associate to the Partition.
"""
sparse: bool = config.attr(type=bool, default=True)
"""
Boolean flag to expect a sparse or dense mask.
If the mask selects most
voxels, use ``dense``, otherwise use ``sparse``.
"""
strict: bool = config.attr(type=bool, default=True)
"""
Boolean flag to check the sources and the mask sizes.
When the flag is True, sources and mask should have exactly the same sizes; otherwise,
sources sizes should be greater than mask sizes.
"""
@config.property(type=bool)
def mask_only(self):
"""Flag to indicate no voxel data needs to be stored"""
return len(self.sources) == 0
@property
def voxel_size(self):
"""Size of each voxel."""
if getattr(self, "_mask_src", None) is None:
self._validate()
return self._mask_src[0].voxel_size
[docs]
def get_mask(self):
"""
Get the mask to apply on the sources' data of the partition.
:returns: A tuple of arrays, one for each dimension of the mask, containing the
indices of the non-zero elements in that dimension.
"""
mask_shape = self._validate()
mask = np.zeros(mask_shape, dtype=bool)
if self.sparse:
# Use integer (sparse) indexing
mask = [np.empty((0,), dtype=int) for i in range(3)]
for mask_src in self._mask_src:
mask_data = mask_src.get_data().raw
new_mask = np.nonzero(self._mask_cond(mask_data))
for i, mask_vector in enumerate(new_mask):
mask[i] = np.concatenate((mask[i], mask_vector))
inter = np.unique(mask, axis=1)
mask = tuple(inter[i, :] for i in range(3))
else:
# Use boolean (dense) indexing
for mask_src in self._mask_src:
mask_data = mask_src.get_data().raw
mask = mask | self._mask_cond(mask_data)
mask = np.nonzero(mask)
return mask
def _get_source_ref_key(self, ref):
# Check `sources` nrrd ref to retrieve the name they will be referred to
# during placement
# if ref is a string ref to files
if isinstance(ref, str):
return ref
# if ref is a dict to cast, we take the stem of the file
elif isinstance(ref["file"], str):
return Path(ref["file"]).stem
# if ref is already cast, we take the stem of the file
elif isinstance(ref, NrrdDependencyNode):
return Path(ref.file.uri).stem
else: # pragma: nocover
raise CfgReferenceError(
f"Unexpected reference type for source: {ref} in partition: {self.name}"
)
[docs]
def to_voxels(self):
mask = self.get_mask()
voxel_data = None
if not self.mask_only:
voxel_data = np.empty((len(mask[0]), len(self._src)))
for i, source in enumerate(self._src):
voxel_data[:, i] = source.get_data().raw[mask]
data_keys = [self._get_source_ref_key(ref) for ref in self.sources._reflist]
return VoxelSet(
np.transpose(mask),
self.voxel_size,
data=voxel_data,
data_keys=data_keys,
)
def _validate(self):
self._validate_sources()
shape = self._validate_source_compat()
self._validate_mask_condition()
return shape
def _validate_sources(self):
self._src = self.sources.copy()
if self.mask_source is not None:
self._mask_src = [self.mask_source]
else:
self._mask_src = self._src.copy()
def _validate_source_compat(self):
mask_headers = {s: s.get_header() for s in self._mask_src}
source_headers = {s: s.get_header() for s in self._src}
all_headers = mask_headers.copy()
all_headers.update(source_headers)
dim_probs = [(s, d) for s, h in all_headers.items() if (d := h["dimension"]) != 3]
if dim_probs:
summ = ", ".join(f"'{s}' has {d}" for s, d in dim_probs)
raise ConfigurationError(f"NRRD voxels must contain 3D arrays; {summ}")
mask_sizes = {s: [*h["sizes"]] for s, h in mask_headers.items()}
source_sizes = {s: [*h["sizes"]] for s, h in source_headers.items()}
all_sizes = mask_sizes.copy()
all_sizes.update(source_sizes)
mask_shape = np.maximum.reduce([*mask_sizes.values()])
if self.mask_only:
src_shape = mask_shape
else:
src_shape = np.minimum.reduce([*source_sizes.values()])
first = _repeat_first()
# Check for any size mismatch
resolutions = [s.voxel_size for s in all_headers]
if any(size != first(size) for size in resolutions):
raise ConfigurationError(
f"NRRD resolution mismatch in `{self.get_node_name()}`: {resolutions}"
)
first = _repeat_first()
if self.strict and any(size != first(size) for size in all_sizes.values()):
raise ConfigurationError(
f"NRRD file size mismatch in `{self.get_node_name()}`: {all_sizes}"
)
elif np.any(mask_shape > src_shape):
raise ConfigurationError(
"NRRD mask too big; it may select OOB source voxels:"
+ f" {mask_shape} > {src_shape}"
)
return mask_shape
def _validate_mask_condition(self):
if self.mask_value:
self._mask_cond = lambda data: data == self.mask_value
else:
self._mask_cond = lambda data: data != 0
[docs]
@config.node
class AllenStructure(NrrdVoxels, classmap_entry="allen"):
"""
Partition based on the Allen Institute for Brain Science mouse brain region ontology,
later referred as Allen Mouse Brain Region Hierarchy (AMBRH)
"""
struct_id: int = config.attr(
type=types.int(min=1), # ids are positive and 0 is the outside of the brain
required=types.mut_excl("struct_id", "struct_name", required=False),
)
"""
Id of the region to filter within the annotation volume according to the AMBRH.
If struct_id is set, then struct_name should not be set.
"""
struct_name: str = config.attr(
type=types.str(strip=True, lower=True),
required=types.mut_excl("struct_id", "struct_name", required=False),
key=True,
)
"""
Name or acronym of the region to filter within the annotation volume according to the
AMBRH.
If struct_name is set, then struct_id should not be set.
"""
@classmethod
@functools.cache
def _dl_mask(cls):
node = NrrdDependencyNode()
node._file = _cached_file(
"https://download.alleninstitute.org/informatics-archive/current-release/mouse_ccf/annotation/ccf_2017/annotation_25.nrrd",
)
return node
@classmethod
@functools.cache
def _dl_structure_ontology(cls):
content = _cached_file(
"http://api.brain-map.org/api/v2/structure_graph_download/1.json"
).get_content()[0]
try:
return json.loads(content)["msg"]
except json.decoder.JSONDecodeError as json_error: # pragma: nocover
raise AllenApiError(
"Could not parse the Allen mouse brain region hierarchy, "
"most likely because the Allen API website is down. \n"
"Here is the content retrieved: \n"
f"{content}"
) from json_error
@functools.cached_property
def region_map(self):
"""
Return RegionMap instance to manipulate the Allen brain region hierarchy.
:rtype: voxcell.region_map.RegionMap
"""
return RegionMap.from_dict(AllenStructure._dl_structure_ontology()[0])
@functools.cached_property
def annotations(self):
"""
Return VoxelData instance of the Allen annotations.
:rtype: voxcell.voxel_data.VoxelData
"""
return self.mask_source.load_object()
[docs]
@classmethod
def get_structure_mask_condition(cls, find):
"""
Return a lambda that when applied to the mask data, returns a mask that delineates
the Allen structure.
:param find: Acronym, Name or ID of the Allen structure.
:type find: str | int
:returns: Masking lambda
:rtype: Callable[numpy.ndarray]
"""
mask = cls.get_structure_idset(find)
if len(mask) > 1:
return lambda data: np.isin(data, mask)
else:
mask0 = mask[0]
return lambda data: data == mask0
[docs]
@classmethod
def get_structure_mask(cls, find):
"""
Returns the mask data delineated by the Allen mouse brain
region.
:param find: Acronym, Name or ID of the Allen structure.
:type find: str | int
:returns: A boolean of the mask filtered based on the Allen structure.
:rtype: Callable[numpy.ndarray]
"""
mask_data = cls._dl_mask().load_object().raw
return cls.get_structure_mask_condition(find)(mask_data)
[docs]
@classmethod
def get_structure_idset(cls, find):
"""
Return the set of IDs that make up the requested Allen structure.
:param find: Acronym or ID of the Allen structure.
:type find: str | int
:returns: Set of IDs
:rtype: numpy.ndarray
:raises: NodeNotFoundError
"""
region_map = cls().region_map
if isinstance(find, str):
id_roi = list(
region_map.find(
find, attr="name", ignore_case=True, with_descendants=True
)
)
if len(id_roi) == 0:
id_roi = list(
region_map.find(
find, attr="acronym", ignore_case=True, with_descendants=True
)
)
elif isinstance(find, int | float):
id_roi = list(region_map.find(int(find), attr="id", with_descendants=True))
else:
raise TypeError(f"Argument must be a string or a number. {type(find)} given.")
if len(id_roi) == 0:
raise NodeNotFoundError("Could not find a node that satisfies constraints.")
return np.array(id_roi)
def _validate(self):
# If neither sources nor mask_source were provided,
# use allen ccfv3 annotation volume
if self.mask_source is None:
self.mask_source = self._dl_mask()
return super()._validate()
def _validate_mask_condition(self):
# We override the `NrrdVoxels`' `_validate_mask_condition` and use this
# function as a hook to find and set the mask condition to select every voxel that
# has an id that is part of the structure.
id = self.struct_id if self.struct_id is not None else self.struct_name
self._mask_cond = self.get_structure_mask_condition(id)
def _repeat_first():
_set = False
first = None
def repeater(val):
nonlocal _set, first
if not _set:
first, _set = val, True
return first
return repeater
__all__ = ["AllenStructure", "Layer", "NrrdVoxels", "Partition", "Rhomboid", "Voxels"]