rockfish.streams
Stream
Stream is an AsyncGenerator with convenience methods for common operations.
Functions
closing() -> AbstractAsyncContextManager[Self]
Return an async context manager that ensures the asynchronous generator is closed.
async with stream.closing() as stream:
async for item in stream:
print(item)
take(n) -> Stream[T]
Return an adapted iterator that will yield up to the next n items.
If there are fewer than n items left in the iterator, it will yield all of the remaining items.
>>> await stream.take(5).collect()
filter(predicate: Callable[[T], bool]) -> Stream[T]
Return an adapted iterator that will yield items if the provided function returns True.
nth(n) -> T
async
Consume iterator and return the nth item.
This is an indexing operation, and so the first item is nth(0)
, the
second nth(1)
, etc.
>>> await stream.nth(0)
Raises:
Type | Description |
---|---|
ValueError
|
when the nth item does not exist |
last() -> T
async
Consume iterator and return the last item.
>>> await stream.last()
Raises:
Type | Description |
---|---|
ValueError
|
if the iterator is empty |
collect() -> list[T]
async
Consume iterator and return a list of all items.
>>> await stream.collect()
concat(conn: AbstractConnection, session_key: Optional[str] = None) -> LocalDataset
async
Download and concatenate all RemoteDatasets into a single LocalDataset.
If the session_key is given, this field is treated as a unique session key and its uniqueness is preserved when concatenating.
>>> await workflow.datasets().concat(conn)