Skip to content

rockfish.streams

Stream

Stream is an AsyncGenerator with convenience methods for common operations.

Functions

closing() -> AbstractAsyncContextManager[Self]

Return an async context manager that ensures the asynchronous generator is closed.

async with stream.closing() as stream:
    async for item in stream:
        print(item)
Asynchronous generators should be closed when they are done being iterated to ensure their cleanup code runs at the expected time. See contextlib.aclosing for details.

take(n) -> Stream[T]

Return an adapted iterator that will yield up to the next n items.

If there are fewer than n items left in the iterator, it will yield all of the remaining items.

>>> await stream.take(5).collect()

filter(predicate: Callable[[T], bool]) -> Stream[T]

Return an adapted iterator that will yield items if the provided function returns True.

nth(n) -> T async

Consume iterator and return the nth item.

This is an indexing operation, and so the first item is nth(0), the second nth(1), etc.

>>> await stream.nth(0)

Raises:

Type Description
ValueError

when the nth item does not exist

last() -> T async

Consume iterator and return the last item.

>>> await stream.last()

Raises:

Type Description
ValueError

if the iterator is empty

collect() -> list[T] async

Consume iterator and return a list of all items.

>>> await stream.collect()

concat(conn: AbstractConnection, session_key: Optional[str] = None) -> LocalDataset async

Download and concatenate all RemoteDatasets into a single LocalDataset.

If the session_key is given, this field is treated as a unique session key and its uniqueness is preserved when concatenating.

>>> await workflow.datasets().concat(conn)