import asyncio
import random
import string
from collections.abc import Generator, Iterator
from datetime import datetime
from itertools import count
from itertools import cycle as cycle_
from time import sleep
from typing import Annotated as A
from typing import Any
from faker import Faker
from pydantic import Field
from noob import Name, NodeSpecification, process_method
from noob.edge import Signal, Slot
from noob.event import MetaSignal
from noob.node import Node
from noob.types import Epoch, EventMap
[docs]
def count_source(
limit: int = 10000, start: int = 0
) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=start)
if limit == 0:
while True:
yield next(counter)
else:
while (val := next(counter)) < limit:
yield val
[docs]
def letter_source() -> Generator[A[str, Name("letter")]]:
yield from cycle_(string.ascii_lowercase)
[docs]
def word_source() -> Generator[A[str, Name("word")]]:
fake = Faker()
while True:
word = fake.word()
if len(word) > 1:
yield word
[docs]
def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]:
fake = Faker()
while True:
yield [fake.unique.word() for _ in range(n)]
[docs]
def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]:
fake = Faker()
i = 0
while True:
i += 1
if i % every == 0:
yield fake.word()
else:
yield None
[docs]
def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]:
fake = Faker()
while True:
n_counts = random.randint(2, 5)
yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs]
def multiply(left: int, right: int = 2) -> int:
"""
Return value purposely unnamed,
to be used as `{nodename}.value`
"""
return left * right
[docs]
def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]:
return numerator / denominator
[docs]
def concat(strings: list[str]) -> str:
return "".join(strings)
[docs]
def multi_concat(**kwargs: list[str]) -> str:
return "".join("".join(letter for letter in word) for word in kwargs.values())
[docs]
def exclaim(string: str, hype: int = 1) -> str:
return string + ("!" * hype)
[docs]
def repeat(string: str, times: int) -> str:
return string * times
[docs]
def dictify(key: str, items: list[Any]) -> dict[str, Any]:
return {key: items}
[docs]
def error(value: Any) -> None:
raise ValueError("This node just emits errors")
[docs]
class CountSource(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class UnannotatedGenerator(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self): # noqa: ANN201
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class Multiply(Node):
[docs]
def process(self, left: int, right: int = 2) -> A[int, Name("product")]:
return multiply(left=left, right=right)
[docs]
class VolumeProcess:
def __init__(self, height: int = 2):
self.height = height
[docs]
def process(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Volume:
def __init__(self, height: int = 2):
self.height = height
[docs]
@process_method
def volume(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Now:
def __init__(self):
self.now = datetime.now()
[docs]
@process_method
def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]:
return f"{prefix}{self.now.isoformat()}"
[docs]
class CountSourceDecor:
def __init__(self, start: int = 0) -> None:
self.gen = count(start=start)
[docs]
@process_method
def process(self) -> Generator[A[int, Name("count")], None, None]:
yield from self.gen
[docs]
def long_add(value: float, sleep_for: float = 0.25) -> float:
sleep(sleep_for)
return value + 1
[docs]
async def number_to_letter(number: int, offset: int = 0) -> str:
sleep_for = random.random() / 10
await asyncio.sleep(sleep_for)
return string.ascii_lowercase[(number + offset) % len(string.ascii_lowercase)]
[docs]
class NumberToLetterCls:
def __init__(self, offset: int = 0):
self.offset = offset
[docs]
async def process(self, number: int) -> str:
sleep_for = random.random() / 10
await asyncio.sleep(sleep_for)
return string.ascii_lowercase[(number + self.offset) % len(string.ascii_lowercase)]
[docs]
async def async_error(value: Any) -> None:
"""Just raise an error!"""
raise ValueError("This is the error that should be raised")
[docs]
class StatefulMultiply:
def __init__(self, start: int = 0) -> None:
self.start = start
self.current = self.start
[docs]
def process(self, left: float, right: float = 1) -> float:
value = left * right * self.current
self.current += 1
return value
[docs]
def fast_forward(generator: count, n: int = 1) -> tuple[A[int, Name("next")]]:
for _ in range(n):
val = next(generator)
return val
[docs]
def jump(generator: count, n: int = 1) -> tuple[A[count, Name("skirttt")], A[int, Name("next")]]:
value = 0
for _ in range(n):
value = next(generator)
return generator, value
[docs]
def rewind(generator: count, n: int = 1) -> A[count, Name("skrittt")]:
"""Purposely designed to mutate the input and return a new object"""
return count(next(generator) - n)
[docs]
def zip_iter(*args: Iterator) -> tuple[Any, ...]:
return tuple(next(a) for a in args)
[docs]
def increment(
iterator: Iterator[int], increment: int = 1
) -> tuple[A[Iterator[int], Name("iterator")], A[int, Name("value")]]:
for _ in range(increment):
value = next(iterator)
return iterator, value
[docs]
def passthrough(value: Any, *args: Any, **kwargs: Any) -> Any:
return value, args, kwargs
[docs]
class InitCounter(Node):
"""Count how many times we have been initialized and deinitalized"""
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self._inits = 0
self._deinits = 0
[docs]
def process(self) -> tuple[A[int, Name("inits")], A[int, Name("deinits")]]:
# sleep to just not have this flood the networking modules.
sleep(0.01)
return self._inits, self._deinits
[docs]
def init(self) -> None:
self._inits += 1
[docs]
def deinit(self) -> None:
self._deinits += 1
[docs]
def inject_epoch(epoch: Epoch) -> Epoch:
return epoch
[docs]
def inject_eventmap(special_value: Any, events: EventMap) -> EventMap:
return events
[docs]
def just_wait(value: Any, wait_for: float = 0.5) -> Any:
sleep(wait_for)
return value
[docs]
def switch() -> (
Generator[tuple[A[str, Name("fruits")], A[str, Name("vegetables")], A[str, Name("minerals")]]]
):
"""Yield in a cycle from different signals, noeventing in the others"""
fruits = cycle_(["apple", "banana", "cherry"])
vegetables = cycle_(["daikon", "eggplant", "fiddlehead"])
minerals = cycle_(["galaxite", "halite", "iolite"])
while True:
yield next(fruits), MetaSignal.NoEvent, MetaSignal.NoEvent
yield MetaSignal.NoEvent, next(vegetables), MetaSignal.NoEvent
yield MetaSignal.NoEvent, MetaSignal.NoEvent, next(minerals)
[docs]
def this_or_that(
this: Any | None = None, that: Any | None = None, the_other: Any | None = None
) -> dict:
"""Optional inputs!"""
ret = {}
if this is not None:
ret["this"] = this
if that is not None:
ret["that"] = that
if the_other is not None:
ret["the_other"] = the_other
return ret
[docs]
class DynamicSignals(Node):
"""Node whose signals and slots are dynamic"""
signals_: list[str] = Field(default_factory=list)
slots_: list[str] = Field(default_factory=list)
[docs]
def process(self, **kwargs: Any) -> Any | None:
return tuple(v for v in kwargs.values())
[docs]
@classmethod
def get_signals(cls, spec: NodeSpecification | None = None) -> dict[str, Signal]:
if not spec or not spec.params:
return {}
return {sig: Signal(name=sig, annotation=Any) for sig in spec.params.get("signals_", [])}
[docs]
@classmethod
def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]:
if not spec or not spec.params:
return {}
return {
sig: Slot(name=sig, annotation=Any, required=True)
for sig in spec.params.get("slots_", [])
}
[docs]
def cycle(start: int = 0, stop: int = 100, step: int = 1) -> Generator[float]:
while True:
yield from range(start, stop, step)