tube¶
- pydantic model TubeSpecification[source]¶
Configuration for the nodes within a tube.
Representation of the yaml-form of a tube. Converted to the runtime-form with
Tube.from_specification().Not much, if any validation is performed here on the whole tube except that the nodes have the correct fields, ignoring validity of e.g. type mismatches or dependencies on signals that don’t exist. Those require importing and introspecting the specified node classes, which should only happen when we try and instantiate the tube - this class is just a carrier for the yaml spec.
Instance creation can be customized with parameters passed to the model’s validation context (see: https://pydantic.dev/docs/validation/latest/concepts/validators/#validation-context ).
recursive: True- replace the tube ID in aTubeNodewith the full spec
- Config:
validate_default: bool = True
extra: str = forbid
- Fields:
- Validators:
dependencies_exist»all fieldsload_recursive_specs»all fields
- field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], AssetSpecification] [Optional]¶
The specs of the assets that comprise the state of this tube
- Validated by:
- field extends: list[Path | PathLike[str] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='[\\w\\-\\/#]+')])]] | None = None¶
Other tubes that this tube extends.
All the node and other specifications from the other tubes are collapsed into this one such that later tubes in the list override earlier tubes (and the current tube overrides all the extended tubes).
Extensions are neither completely deep nor completely shallow. In general, items are merged as deeply as they can be, with some exceptions:
depends: for specs with matching IDs, more recent declarations fully replace earlier ones. This is to facilitate extending tubes being able to remove dependencies - rather than having a specific “remove” keyword, to remove a dependency, copy all but the one to be removed. To add a dependency, copy all the previous ones along with the one ones
return: only one return node can exist in a tube, so the id given to the most recent return is used and all other returns are removed.
In tubes that extend other tubes, fields that are typically required are treated as optional in the yaml spec in order to avoid needing to redefine evey property. For example, if a tube wants to just change a node’s params from an extended tube, the extending tube needs to only declare the node’s ID and params without needing to declare the type again.
- Validated by:
- field input: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], InputSpecification] [Optional]¶
Inputs provided at runtime
- Validated by:
- field nodes: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], NodeSpecification] [Optional]¶
The nodes that this tube configures
- Validated by:
- validator fill_node_ids » nodes, assets, input[source]¶
Roll down the id from the key in the nodes dictionary into the node config
- validator dependencies_exist » all fields[source]¶
Nodes referred to in dependencies should exist, they must be - other nodes in the spec - input (and the input signal must exist) - assets (and the asset must exist)
Note: we do not check the validity of signals here, as that would require us to import/inspect the code object referred to by the node, and we want specifications to be loadable even if the referred code is not present and installed in the environment.
- validator load_recursive_specs » all fields[source]¶
If
recursive: Truein the validation context, load the full tube specification referenced by a tube ID intype: tubenodes.
- property nodeinfo: dict[str, NodeInfo][source]¶
Nodeinfo for each node in the tube. Even though NodeInfo typically doesn’t vary between multiple instances of the same type, we still create one per node because they can, and making that configurable would be bloat compared to the ease of computing it.
- pydantic model Tube[source]¶
A graph of nodes transforming some input source(s) to some output sink(s)
The Tube model is a container for a set of nodes that are fully instantiated (e.g. have their “passed” and “fill” keys processed) and connected. It does not handle running the tube – that is handled by a TubeRunner.
- Config:
arbitrary_types_allowed: bool = True
- Fields:
- Validators:
_create_scheduler»schedulerassets_exhausted_after_storage»all fieldsinput_present»all fieldsno_cross_map_dependencies»all fields
- field edges: list[Edge] [Required]¶
Edges connecting slots within nodes.
The nodes within
Edge.source_nodeandEdge.target_nodemust be the same objects as those inTube.nodes(i.e.edges[0].source_node is nodes[node_id]).
- field input_collection: InputCollection [Optional]¶
Specifications declared by the tube to be supplied
- field nodes: dict[str, Node] [Required]¶
Dictionary mapping all nodes from their ID to the instantiated node.
- field spec: TubeSpecification | None = None¶
- classmethod from_specification(spec: TubeSpecification | Path | PathLike[str] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='[\\w\\-\\/#]+')])], input: dict | None = None) Self[source]¶
Instantiate a tube model from its configuration
- Parameters:
spec (
TubeSpecification) – the tube config to instantiate- Raises:
InputMissingError if requested tube-scoped input is not present. –
- validator only_one_return » nodes[source]¶
Only one return node is allowed.
Validate both here and in the spec because tubes can be created programmatically
- validator assets_exhausted_after_storage » all fields[source]¶
Runner-scoped assets that depend on node outputs can’t be directly depended on in topological generations during or after when they are stored in order to protect against unstructured mutation.
- in_edges(node: Node | str) list[Edge][source]¶
Edges going towards the given node (i.e. the node is the edge’s
target)
- validator input_present » all fields[source]¶
Validate the presence of required tube-scoped inputs.
Though tubes are usually instantiated from specs, this catches the case when they are constructed directly.
- model_post_init(context: Any, /) None¶
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.
- validator no_cross_map_dependencies » all fields[source]¶
A node can only be downstream of one linear chain of map operations.
Fine:
a -> map_b -> c
a -> map_b -> map_c -> d
Not fine:
a -> map_a -> c; b -> map_b -> c
- downstream_nodes(edges: list[Edge], node_id: str, exclude: set[str] | None = None) set[str][source]¶
The set of nodes that through some chain of dependencies depend on the given node_id
- Parameters:
exclude (
set[str]) – Nodes (and their successors) to exclude from downstream graph
- merge_tube_specs(left: Mapping | TubeSpecification, right: Mapping | TubeSpecification) Mapping[source]¶
Merge the yaml/json dict form of a tube spec pre-casting to
TubeSpecificationwhile resolvingextends.See
TubeSpecification.extendsfor details of merge behavior.