Source code for noob.node.return_

"""
Special Return sink that tube runners use to return values from :meth:`.TubeRunner.process`
"""

from collections import defaultdict
from typing import Any

from pydantic import PrivateAttr

from noob.edge import Slot
from noob.event import MetaSignal
from noob.node.base import Node
from noob.node.spec import NodeSpecification
from noob.types import Epoch, EventMap


[docs] class Return(Node): """ Special sink node that returns values from a tube runner's `process` method """ stateful: bool = False _args: tuple | None = None _kwargs: dict = PrivateAttr(default_factory=lambda: defaultdict(list)) _seen_epochs: set[tuple[Epoch, str]] = set()
[docs] def process(self, *args: Any, __events: EventMap, **kwargs: Any) -> MetaSignal: """ Store the incoming value to retrieve later with :meth:`.get` """ if self._args is None: self._args = args else: self._args += args for key, val in kwargs.items(): if (__events[key]["epoch"], key) in self._seen_epochs: continue self._kwargs[key].append((__events[key]["epoch"], val)) self._seen_epochs.add((__events[key]["epoch"], key)) return MetaSignal.NoEvent
[docs] def get(self, keep: bool) -> Any | None: """ Get the stored value from the process call, clearing it. """ if self._kwargs: # sort by epoch and flatten if only one value received kwargs = {} for key, val in self._kwargs.items(): if len(val) == 1: kwargs[key] = val[0][1] else: kwargs[key] = [item[1] for item in sorted(val, key=lambda i: i[0])] else: kwargs = {} try: # FIXME: what a nightmare - make all of these derive from the spec if self._args and self.spec is not None and isinstance(self.spec.depends, str): return self._args[0] elif self._args and kwargs: return self._args, kwargs elif self._args: return self._args elif kwargs: return kwargs else: return None finally: if not keep: self._args = None self._kwargs = defaultdict(list) self._seen_epochs = set()
[docs] @classmethod def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]: if spec is None or not spec.depends: raise ValueError("Return nodes must have a specification that defines what they return") if isinstance(spec.depends, str): return {} slots = {} for dep in spec.depends: if isinstance(dep, str): continue name = list(dep.keys())[0] slots[name] = Slot(name=name, annotation=Any, required=False) return slots