Skip to content

rockfish

import rockfish as rf

rockfish.Connection

Connection is a factory class that produces implementations of AbstractConnection.

Functions

local() -> rockfish.local.Connection classmethod

Create a new local Connection.

A local connection executes locally on the machine and not on the Rockfish API.

remote(*args, **kwargs) -> rockfish.remote.Connection classmethod

remote(base_url: str, api_key: str, /, *, project: Optional[str] = None, organization: Optional[str] = None, product: Optional[str] = None, timeout=None) -> rockfish.remote.Connection
remote(api_key: str, /, *, api_url: Optional[str] = None, project: Optional[StrOrProject] = None, organization: Optional[StrOrOrganization] = None, product: Optional[str] = None, timeout=None, features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection

Create a new remote Connection.

The connection should be used as a context manager when possible in order to close and free up resources:

async with rf.Connection.remote("MY_API_KEY") as conn:
    ...

As context managers cannot span multiple cells in a notebook, you can create an instance and close it manually:

conn = rf.Connection.remote("MY_API_KEY")
...
await conn.close()

Use the api_url, project, and organization keyword arguments to use a non-default value.

from_config(profile='default', *, organization: Optional[StrOrOrganization] = None, project: Optional[StrOrProject] = None, features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection classmethod

Create a new remote Connection using a Rockfish config file.

The Rockfish config file contains one or more profiles that can be selected from by name to use the api_key, api_url, and optionally a project and organization.

A Rockfish config file can be created in the user's home directory at ~/.config/rockfish/config.toml. If it does not exist, it can be created manually.

It is recommended to set the permissions of this file to allow only yourself to read it:

chmod 600 ~/.config/rockfish/config.toml

The config uses the TOML format.

Example:

[profile.default]
api_url = "https://api.rockfish.ai"
api_key = "your api key..."

[profile.ocp]
api_url = "https://api.rockfish.ai"
api_key = "your api key..."
organization = "7sEeE4FFWhps2mt0uy1iln"

from_env(features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection classmethod

Create a new Connection based on environment variables.

The following environment variables can be set:

  • ROCKFISH_API_KEY (required)
  • ROCKFISH_API_URL
  • ROCKFISH_PROJECT_ID
  • ROCKFISH_ORGANIZATION_ID

rockfish.AbstractConnection

AbstractConnection is the base class for local and remote connections.

Instances can be created using Connection.

Functions

close() abstractmethod async

Close the Connection, freeing all resources.

>>> workflow = await conn.close()

active_organization() -> Organization abstractmethod async

Get the active Organization.

>>> await conn.active_organization()

organizations() -> Stream[Organization] abstractmethod

Get a Stream of Organizations.

>>> async for org in conn.organizations()
...     print(org)

active_project() -> Project abstractmethod async

Get the active Project.

>>> await conn.active_project()

projects(*, name: Optional[str] = None) -> Stream[Project] abstractmethod

Get a Stream of Projects.

>>> async for project in conn.projects()
...     print(project)

Parameters:

Name Type Description Default
name Optional[str]

Return only projects with this name.

None

create_project(name: str) -> Project abstractmethod async

Create a new Project.

>>> await conn.create_project("myproject")

change_project(project: StrOrProject) -> None abstractmethod

Change the active project to project, all further calls on the Connection will be with this project.

Using project-id
>>> conn.change_project("42uy7DKYiwYx4cWJt5Xo2R")
Using project name
>>> project = await conn.projects(name="myproject").nth(0)
>>> conn.change_project(project)

users() -> Stream[User] abstractmethod

Get a Stream of Users in your organization.

>>> async for user in conn.users()
...     print(user)

get_user(user_id: str) -> User abstractmethod async

Get a User from a user_id.

active_user() -> User abstractmethod async

Get your current active User.

tokens() -> Stream[Token] abstractmethod

Get a Stream of your Tokens.

>>> async for token in conn.tokens()
...     print(token)

create_token() -> Token abstractmethod async

Create a new Token for your user.

>>> await conn.create_token()

delete_token(token: Token) -> None abstractmethod async

Delete a Token.

>>> await conn.delete_token(token)

workflows(*, status: Optional[WorkflowStatus] = None, labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None, order: Order = Order.ASCENDING, limit: Optional[int] = 10) -> Stream[AbstractWorkflow] abstractmethod

Get a Stream of Workflows.

List all workflows
>>> async for workflow in conn.workflow():
...     print(workflow)
Complex listing
>>> async for workflow in conn.workflow(
        status=rf.Status.COMPLETED,
        labels={"project": "icarus"},
        after=datetime.now(timezone.utc) - timedelta(days=20),
        order=Order.DESCENDING,
        limit=20,
    ):
...     print(workflow)

Parameters:

Name Type Description Default
status Optional[WorkflowStatus]

Only return items with this status.

None
labels Optional[LabelDict]

Only return items with these labels.

None
after Optional[StrOrDatetime]

Only return items created after this time.

None
before Optional[StrOrDatetime]

Only return items created before this time.

None
order Order

List items in this order by create time.

ASCENDING
limit Optional[int]

Limit results to this many items.

10

get_workflow(workflow_id: str) -> AbstractWorkflow abstractmethod async

Get a Workflow by ID.

>>> workflow = await conn.get_workflow("5X2M615Ot0SzoAh3mN5sto")

datasets(labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None) -> Stream[RemoteDataset] abstractmethod

Get a Stream of datasets.

>>> async for dataset in conn.datasets():
        print(dataset)

query_datasets(query: str) -> LocalDataset abstractmethod async

Create a new LocalDataset from a query.

>>> await conn.query_datasets('SELECT * FROM "2C4BUEiZTUJaTALGdim2oX";')

models(labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None) -> Stream[Model] abstractmethod

Get a Stream of Models.

>>> async for models in conn.models():
        print(models)

get_model(model_id: str) -> Model abstractmethod async

Get a Model by ID.

>>> model = await conn.get_model("6zMYbFK2p3YwSEUR2oaBc8")

rockfish.WorkflowBuilder

Functions

add_dataset(dataset: AbstractDataset, *, alias: Optional[str] = None)

Add a dataset to the Workflow.

add_model(model: Model, *, alias: Optional[str] = None)

Add a model to the Workflow.

add_action(action: Action, *, alias: Optional[str] = None, parents: Optional[list] = None)

Add an action to the Workflow.

add(action: Union[Action, Model, AbstractDataset], *, alias: Optional[str] = None, parents: Optional[list] = None)

Add an action, model or dataset to the Workflow.

add_path(*actions: Union[Action, Model, AbstractDataset], alias: Optional[str] = None, parents: Optional[list] = None)

Add a path of action, model or dataset to the Workflow.

The items are connected from left to right, parent to child.

worker_group(group)

Require the workflow to run on a non-system worker group.

start(conn: Optional[AbstractConnection] = None) -> AbstractWorkflow async

Start the workflow running on the conn.

rockfish.AbstractWorkflow

AbstractWorkflow represents a remote or local Workflow and can be used to control the workflow and collect status information.

New Workflows can be created using the WorkflowBuilder.

Functions

id() -> str abstractmethod

Returns the workflow ID, an opaque string that uniquely identifies the workflow.

>>> workflow.id()
2ZxiQe4SDTNYYSjd87Wshp

status() -> str abstractmethod async

Return a string describing the Workflow status.

stop() abstractmethod async

Request the workflow to stop.

>>> await workflow.stop()

wait(raise_on_failure: bool = False) -> None abstractmethod async

Wait until the workflow is completed.

>>> await workflow.wait()

events(*actions: ActionID, event_types: Optional[list[EventType]] = None) -> Stream[WrappedEvent] abstractmethod

Return a Stream of the workflow raw events.

>>> async for event in workflow.events():
...     print(event)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()

states(*actions: ActionID) -> Stream[StateEvent] abstractmethod

Return a Stream of the workflow StateEvent.

>>> async for state in workflow.states():
...     print(state)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()

logs(*actions: ActionID, level: LogLevel = LogLevel.INFO) -> Stream[LogEvent] abstractmethod

Return a Stream of the workflow LogEvent.

>>> async for log in workflow.logs():
...     print(log)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()
level LogLevel

Return logs of this level or higher.

INFO

Return a Stream of the workflow LinkEvent.

>>> async for links in workflow.links():
...     print(links)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()

progress(*actions: ActionID) -> Stream[ProgressEvent] abstractmethod

Return a Stream of the workflow ProgressEvent.

>>> async for progress in workflow.progress():
...     print(progress)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()

datasets(*actions: ActionID) -> Stream[RemoteDataset] abstractmethod

Return a Stream of the workflow RemoteDataset.

>>> async for dataset in workflow.datasets():
...     print(dataset)

Parameters:

Name Type Description Default
actions ActionID

Return events from only these actions.

()

model_id(*names) -> str abstractmethod async

Return the first model_id.

Deprecated: Use AbstractWorkflow.models().

models(*names) -> Stream[Model] abstractmethod

Return a Stream of the workflow Model.

>>> async for model in workflow.models():
...     print(model)

rockfish.Dataset

A Dataset represents your data.

Datasets can be a LocalDataset, existing on the current system, or they can be a RemoteDataset on the Rockfish API.

Functions

from_id(conn, dataset_id) -> RemoteDataset async staticmethod

Create a RemoteDataset from the Dataset ID.

from_csv(name, path, table_metadata: Optional[TableMetadata] = None) -> LocalDataset staticmethod

Create a LocalDataset from the contents of a CSV file.

from_parquet(name, path, table_metadata: Optional[TableMetadata] = None) -> LocalDataset staticmethod

Create a LocalDataset from the contents of a Parquet file.

from_pandas(name, df, table_metadata: Optional[TableMetadata] = None) -> LocalDataset staticmethod

Create a LocalDataset from the contents of a Pandas dataframe.

from_table(name, table: pa.Table, table_metadata: Optional[TableMetadata] = None) -> LocalDataset staticmethod

Create a LocalDataset from an Arrow table.

from_json(name: str, path: str, table_metadata: Optional[TableMetadata] = None) -> LocalDataset staticmethod

Create a LocalDataset from the contents of a JSON file.

LocalDataset

LocalDataset represents a Dataset with contents on the local system.

Functions

__init__(name: str, table: pa.Table, table_metadata: Optional[TableMetadata] = None)

Create a LocalDataset with name and the pyarrow.Table table as the contents.

from_csv(name: str, path: Union[os.PathLike, BinaryIO], table_metadata=None) -> Self classmethod

Create a LocalDataset with name and the contents contained in the csv encoded file at path.

Parameters:

Name Type Description Default
name str

Name of the Dataset.

required
path Union[PathLike, BinaryIO]

Path of the csv file.

required

from_json(name: str, json_file_path: str, table_metadata: Optional[TableMetadata] = None) -> Self classmethod

Create a LocalDataset from the contents of a JSON file, named name and read from json_file_path. Currently supported JSON formats: .json and .jsonl [newline-delimited JSON].

e.g. valid JSON file:

[
    {"name": "Alice", "age": 25, "address": {"city: "New York", "state": "NY"}},
    {"name": "Bob", "age": 30, "address": {"city: "San Francisco", "state": "CA"}}
]

valid JSONL file:

{"name": "Alice", "age": 25, "address": {"city: "New York", "state": "NY"}}
{"name": "Bob", "age": 30, "address": {"city: "San Francisco", "state": "CA"}}

>>> import rockfish as rf
>>> rf.Dataset.from_json('my_dataset', 'data.json')
LocalDataset('my_dataset')

Parameters:

Name Type Description Default
name str

Name of the Dataset.

required
json_file_path str

Path of the JSON file.

required

Raises:

Type Description
ValueError

If the file is not a valid json or jsonl file.

sql(query: str, *, conn=None) -> Self async

Return a LocalDataset containing the results of an SQL query against this Dataset.

>>> ds = await dataset.sql("select foo from my_table")

Parameters:

Name Type Description Default
query str

An SQL query against a table named my_table.

required
conn

The conn parameter is unusedfor symmetry with RemoteDataset.

None

sync_sql(query) -> Self

Return a LocalDataset containing the results of an SQL query against this Dataset.

>>> ds = dataset.sync_sql("select foo from my_table")

Parameters:

Name Type Description Default
query

An SQL query against a table named my_table.

required

with_table_metadata(table_metadata: TableMetadata) -> Self

Return a new LocalDataset with the provided TableMetadata. This metadata can be accessed from the Actions within a Workflow.

table_metadata() -> TableMetadata

Return the TableMetadata.

select(fields: list[str]) -> Self

Return a LocalDataset containing only the specified fields.

rename_field(old: str, new: str) -> Self

Return a LocalDataset with the field old renamed to new.

drop_fields(fields: list[str]) -> Self

Return a LocalDataset without the fields.

head(n: int) -> Self

Return a LocalDataset containing only the n first records.

to_local(conn=None) -> LocalDataset async

Convert to a LocalDataset.

to_remote(conn) async

Convert to a RemoteDataset.

to_pandas()

Return a pandas dataframe corresponding to this Dataset.

RemoteDataset

Dataset references a remote Dataset.

In order to view the data you must convert it to a local Dataset.

Functions

from_id(conn, id) -> Self async classmethod

Create a remote Dataset that references the id.

Raises:

Type Description
KeyError

If the remote Dataset does not exist.

name() -> str

Return the name of the Dataset.

delete(conn) -> None async

Delete the remote Dataset from the server.

sql(query, *, conn) -> LocalDataset async

Return a LocalDataset containing the results of an SQL query against this Dataset.

to_local(conn=None) -> LocalDataset async

Return a local Dataset containing the full remote Dataset.


rockfish.User

User in the Rockfish API.

Attributes:

Name Type Description
id str
email str

rockfish.Token

Access Token for a User.

This is the sometimes referred to as an api-key.

Attributes:

Name Type Description
id str
token SecretStr

Functions

reveal() -> str

Return the value of the secret token.

rockfish.Project

Project in the Rockfish API.

Attributes:

Name Type Description
id str
name str
default bool

Functions

users() -> Stream[User]

Stream of Users in the Project.

>>> async for user in project.users()
...     print(user)

add_user(user: StrOrUser) -> None async

Add a user to the Project.

>>> user = await organization.users().filter(
...     lambda u: u.email == "bob@example.org").nth(0)
>>> await project.add_user(user)

remove_user(user: StrOrUser) -> None async

Remove a user from the Project.

>>> user = await project.users().filter(
...     lambda u: u.email == "bob@example.org").nth(0)
>>> await project.remove_user(user)

rockfish.Organization

Organization in the Rockfish API.

Attributes:

Name Type Description
id str
name str

Functions

users() -> Stream[User]

Stream of Users in the Organization.

>>> async for user in organization.users()
...     print(user)

add_user(user: StrOrUser) -> None async

Add a user to the Organization.

This method requires that you have the Admin role for the Organization.

>>> await org.add_user("3FJ5Tulq5mUXMadbk63sHO")

remove_user(user: StrOrUser) -> None async

Remove a user from the Organization.

This method requires that you have the Admin role for the Organization.

>>> user = await org.users().filter(lambda u: u.email == "bob@example.org").nth(0)
>>> await org.remove_user(user)