rockfish
import rockfish as rf
rockfish.Connection
Connection is a factory class that produces implementations of
AbstractConnection
.
Functions
local() -> rockfish.local.Connection
classmethod
Create a new local Connection.
A local connection executes locally on the machine and not on the Rockfish API.
remote(*args, **kwargs) -> rockfish.remote.Connection
classmethod
remote(base_url: str, api_key: str, /, *, project: Optional[str] = None, organization: Optional[str] = None, product: Optional[str] = None, timeout=None) -> rockfish.remote.Connection
remote(api_key: str, /, *, api_url: Optional[str] = None, project: Optional[StrOrProject] = None, organization: Optional[StrOrOrganization] = None, product: Optional[str] = None, timeout=None, features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection
Create a new remote Connection.
The connection should be used as a context manager when possible in order to close and free up resources:
async with rf.Connection.remote("MY_API_KEY") as conn:
...
As context managers cannot span multiple cells in a notebook, you can create an instance and close it manually:
conn = rf.Connection.remote("MY_API_KEY")
...
await conn.close()
Use the api_url, project, and organization keyword arguments to use a non-default value.
from_config(profile='default', *, organization: Optional[StrOrOrganization] = None, project: Optional[StrOrProject] = None, features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection
classmethod
Create a new remote Connection using a Rockfish config file.
The Rockfish config file contains one or more profiles that can be
selected from by name to use the api_key
, api_url
, and optionally
a project
and organization
.
A Rockfish config file can be created in the user's home directory at
~/.config/rockfish/config.toml
. If it does not exist, it can be
created manually.
It is recommended to set the permissions of this file to allow only yourself to read it:
chmod 600 ~/.config/rockfish/config.toml
The config uses the TOML format.
Example:
[profile.default]
api_url = "https://api.rockfish.ai"
api_key = "your api key..."
[profile.ocp]
api_url = "https://api.rockfish.ai"
api_key = "your api key..."
organization = "7sEeE4FFWhps2mt0uy1iln"
from_env(features: FeatureFlag = FeatureFlag.default()) -> rockfish.remote.Connection
classmethod
Create a new Connection based on environment variables.
The following environment variables can be set:
ROCKFISH_API_KEY
(required)ROCKFISH_API_URL
ROCKFISH_PROJECT_ID
ROCKFISH_ORGANIZATION_ID
rockfish.AbstractConnection
AbstractConnection is the base class for local and remote connections.
Instances can be created using Connection
.
Functions
close()
abstractmethod
async
Close the Connection, freeing all resources.
>>> workflow = await conn.close()
active_organization() -> Organization
abstractmethod
async
Get the active Organization.
>>> await conn.active_organization()
organizations() -> Stream[Organization]
abstractmethod
Get a Stream of Organizations.
>>> async for org in conn.organizations()
... print(org)
active_project() -> Project
abstractmethod
async
Get the active Project.
>>> await conn.active_project()
projects(*, name: Optional[str] = None) -> Stream[Project]
abstractmethod
create_project(name: str) -> Project
abstractmethod
async
Create a new Project.
>>> await conn.create_project("myproject")
change_project(project: StrOrProject) -> None
abstractmethod
Change the active project to project, all further calls on the Connection will be with this project.
>>> conn.change_project("42uy7DKYiwYx4cWJt5Xo2R")
>>> project = await conn.projects(name="myproject").nth(0)
>>> conn.change_project(project)
users() -> Stream[User]
abstractmethod
Get a Stream of Users in your organization.
>>> async for user in conn.users()
... print(user)
get_user(user_id: str) -> User
abstractmethod
async
Get a User from a user_id.
active_user() -> User
abstractmethod
async
Get your current active User.
tokens() -> Stream[Token]
abstractmethod
Get a Stream of your Tokens.
>>> async for token in conn.tokens()
... print(token)
create_token() -> Token
abstractmethod
async
Create a new Token for your user.
>>> await conn.create_token()
delete_token(token: Token) -> None
abstractmethod
async
Delete a Token.
>>> await conn.delete_token(token)
workflows(*, status: Optional[WorkflowStatus] = None, labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None, order: Order = Order.ASCENDING, limit: Optional[int] = 10) -> Stream[AbstractWorkflow]
abstractmethod
Get a Stream of Workflows.
>>> async for workflow in conn.workflow():
... print(workflow)
>>> async for workflow in conn.workflow(
status=rf.Status.COMPLETED,
labels={"project": "icarus"},
after=datetime.now(timezone.utc) - timedelta(days=20),
order=Order.DESCENDING,
limit=20,
):
... print(workflow)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
Optional[WorkflowStatus]
|
Only return items with this status. |
None
|
labels
|
Optional[LabelDict]
|
Only return items with these labels. |
None
|
after
|
Optional[StrOrDatetime]
|
Only return items created after this time. |
None
|
before
|
Optional[StrOrDatetime]
|
Only return items created before this time. |
None
|
order
|
Order
|
List items in this order by create time. |
ASCENDING
|
limit
|
Optional[int]
|
Limit results to this many items. |
10
|
get_workflow(workflow_id: str) -> AbstractWorkflow
abstractmethod
async
Get a Workflow by ID.
>>> workflow = await conn.get_workflow("5X2M615Ot0SzoAh3mN5sto")
datasets(labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None) -> Stream[RemoteDataset]
abstractmethod
Get a Stream of datasets.
>>> async for dataset in conn.datasets():
print(dataset)
query_datasets(query: str) -> LocalDataset
abstractmethod
async
Create a new LocalDataset from a query.
>>> await conn.query_datasets('SELECT * FROM "2C4BUEiZTUJaTALGdim2oX";')
models(labels: Optional[LabelDict] = None, after: Optional[StrOrDatetime] = None, before: Optional[StrOrDatetime] = None) -> Stream[Model]
abstractmethod
Get a Stream of Models.
>>> async for models in conn.models():
print(models)
get_model(model_id: str) -> Model
abstractmethod
async
Get a Model by ID.
>>> model = await conn.get_model("6zMYbFK2p3YwSEUR2oaBc8")
rockfish.WorkflowBuilder
Functions
add_dataset(dataset: AbstractDataset, *, alias: Optional[str] = None)
Add a dataset to the Workflow.
add_model(model: Model, *, alias: Optional[str] = None)
Add a model to the Workflow.
add_action(action: Action, *, alias: Optional[str] = None, parents: Optional[list] = None)
Add an action to the Workflow.
add(action: Union[Action, Model, AbstractDataset], *, alias: Optional[str] = None, parents: Optional[list] = None)
Add an action, model or dataset to the Workflow.
add_path(*actions: Union[Action, Model, AbstractDataset], alias: Optional[str] = None, parents: Optional[list] = None)
Add a path of action, model or dataset to the Workflow.
The items are connected from left to right, parent to child.
worker_group(group)
Require the workflow to run on a non-system worker group.
start(conn: Optional[AbstractConnection] = None) -> AbstractWorkflow
async
Start the workflow running on the conn.
rockfish.AbstractWorkflow
AbstractWorkflow represents a remote or local Workflow and can be used to control the workflow and collect status information.
New Workflows can be created using the WorkflowBuilder
.
Functions
id() -> str
abstractmethod
Returns the workflow ID, an opaque string that uniquely identifies the workflow.
>>> workflow.id()
2ZxiQe4SDTNYYSjd87Wshp
status() -> str
abstractmethod
async
Return a string describing the Workflow status.
stop()
abstractmethod
async
Request the workflow to stop.
>>> await workflow.stop()
wait(raise_on_failure: bool = False) -> None
abstractmethod
async
Wait until the workflow is completed.
>>> await workflow.wait()
events(*actions: ActionID, event_types: Optional[list[EventType]] = None) -> Stream[WrappedEvent]
abstractmethod
Return a Stream of the workflow raw events.
>>> async for event in workflow.events():
... print(event)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions
|
ActionID
|
Return events from only these actions. |
()
|
states(*actions: ActionID) -> Stream[StateEvent]
abstractmethod
Return a Stream of the workflow StateEvent
.
>>> async for state in workflow.states():
... print(state)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions
|
ActionID
|
Return events from only these actions. |
()
|
logs(*actions: ActionID, level: LogLevel = LogLevel.INFO) -> Stream[LogEvent]
abstractmethod
links(*actions: ActionID) -> Stream[LinkEvent]
abstractmethod
Return a Stream of the workflow LinkEvent.
>>> async for links in workflow.links():
... print(links)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions
|
ActionID
|
Return events from only these actions. |
()
|
progress(*actions: ActionID) -> Stream[ProgressEvent]
abstractmethod
Return a Stream of the workflow ProgressEvent.
>>> async for progress in workflow.progress():
... print(progress)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions
|
ActionID
|
Return events from only these actions. |
()
|
datasets(*actions: ActionID) -> Stream[RemoteDataset]
abstractmethod
Return a Stream of the workflow RemoteDataset.
>>> async for dataset in workflow.datasets():
... print(dataset)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions
|
ActionID
|
Return events from only these actions. |
()
|
model_id(*names) -> str
abstractmethod
async
Return the first model_id.
Deprecated: Use AbstractWorkflow.models()
.
models(*names) -> Stream[Model]
abstractmethod
Return a Stream of the workflow Model.
>>> async for model in workflow.models():
... print(model)
rockfish.Dataset
A Dataset represents your data.
Datasets can be a LocalDataset
, existing
on the current system, or they can be a
RemoteDataset
on the Rockfish API.
Functions
from_id(conn, dataset_id) -> RemoteDataset
async
staticmethod
Create a RemoteDataset from the Dataset ID.
from_csv(name, path, table_metadata: Optional[TableMetadata] = None) -> LocalDataset
staticmethod
Create a LocalDataset from the contents of a CSV file.
from_parquet(name, path, table_metadata: Optional[TableMetadata] = None) -> LocalDataset
staticmethod
Create a LocalDataset from the contents of a Parquet file.
from_pandas(name, df, table_metadata: Optional[TableMetadata] = None) -> LocalDataset
staticmethod
Create a LocalDataset from the contents of a Pandas dataframe.
from_table(name, table: pa.Table, table_metadata: Optional[TableMetadata] = None) -> LocalDataset
staticmethod
Create a LocalDataset from an Arrow table.
from_json(name: str, path: str, table_metadata: Optional[TableMetadata] = None) -> LocalDataset
staticmethod
Create a LocalDataset from the contents of a JSON file.
LocalDataset
LocalDataset represents a Dataset with contents on the local system.
Functions
__init__(name: str, table: pa.Table, table_metadata: Optional[TableMetadata] = None)
Create a LocalDataset with name and the pyarrow.Table
table
as the contents.
from_csv(name: str, path: Union[os.PathLike, BinaryIO], table_metadata=None) -> Self
classmethod
from_json(name: str, json_file_path: str, table_metadata: Optional[TableMetadata] = None) -> Self
classmethod
Create a LocalDataset from the contents of a JSON file, named name and read from json_file_path. Currently supported JSON formats: .json and .jsonl [newline-delimited JSON].
e.g. valid JSON file:
[
{"name": "Alice", "age": 25, "address": {"city: "New York", "state": "NY"}},
{"name": "Bob", "age": 30, "address": {"city: "San Francisco", "state": "CA"}}
]
valid JSONL file:
{"name": "Alice", "age": 25, "address": {"city: "New York", "state": "NY"}}
{"name": "Bob", "age": 30, "address": {"city: "San Francisco", "state": "CA"}}
>>> import rockfish as rf
>>> rf.Dataset.from_json('my_dataset', 'data.json')
LocalDataset('my_dataset')
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the Dataset. |
required |
json_file_path
|
str
|
Path of the JSON file. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the file is not a valid json or jsonl file. |
sql(query: str, *, conn=None) -> Self
async
Return a LocalDataset containing the results of an SQL query against this Dataset.
>>> ds = await dataset.sql("select foo from my_table")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query
|
str
|
An SQL query against a table named |
required |
conn
|
The |
None
|
sync_sql(query) -> Self
Return a LocalDataset containing the results of an SQL query against this Dataset.
>>> ds = dataset.sync_sql("select foo from my_table")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query
|
An SQL query against a table named |
required |
with_table_metadata(table_metadata: TableMetadata) -> Self
Return a new LocalDataset with the provided TableMetadata. This metadata can be accessed from the Actions within a Workflow.
table_metadata() -> TableMetadata
Return the TableMetadata.
select(fields: list[str]) -> Self
Return a LocalDataset containing only the specified fields.
rename_field(old: str, new: str) -> Self
Return a LocalDataset with the field old renamed to new.
drop_fields(fields: list[str]) -> Self
Return a LocalDataset without the fields.
head(n: int) -> Self
Return a LocalDataset containing only the n first records.
to_local(conn=None) -> LocalDataset
async
Convert to a LocalDataset.
to_remote(conn)
async
Convert to a RemoteDataset.
to_pandas()
Return a pandas dataframe corresponding to this Dataset.
RemoteDataset
Dataset references a remote Dataset.
In order to view the data you must convert it to a local Dataset.
Functions
from_id(conn, id) -> Self
async
classmethod
Create a remote Dataset that references the id.
Raises:
Type | Description |
---|---|
KeyError
|
If the remote Dataset does not exist. |
name() -> str
Return the name of the Dataset.
delete(conn) -> None
async
Delete the remote Dataset from the server.
sql(query, *, conn) -> LocalDataset
async
Return a LocalDataset containing the results of an SQL query against this Dataset.
to_local(conn=None) -> LocalDataset
async
Return a local Dataset containing the full remote Dataset.
rockfish.Token
Access Token for a User.
This is the sometimes referred to as an api-key.
Attributes:
Name | Type | Description |
---|---|---|
id |
str
|
|
token |
SecretStr
|
|
Functions
reveal() -> str
Return the value of the secret token.
rockfish.Project
Project in the Rockfish API.
Attributes:
Name | Type | Description |
---|---|---|
id |
str
|
|
name |
str
|
|
default |
bool
|
|
Functions
users() -> Stream[User]
Stream of Users in the Project.
>>> async for user in project.users()
... print(user)
add_user(user: StrOrUser) -> None
async
Add a user to the Project.
>>> user = await organization.users().filter(
... lambda u: u.email == "bob@example.org").nth(0)
>>> await project.add_user(user)
remove_user(user: StrOrUser) -> None
async
Remove a user from the Project.
>>> user = await project.users().filter(
... lambda u: u.email == "bob@example.org").nth(0)
>>> await project.remove_user(user)
rockfish.Organization
Organization in the Rockfish API.
Attributes:
Name | Type | Description |
---|---|---|
id |
str
|
|
name |
str
|
|
Functions
users() -> Stream[User]
Stream of Users in the Organization.
>>> async for user in organization.users()
... print(user)
add_user(user: StrOrUser) -> None
async
Add a user to the Organization.
This method requires that you have the Admin role for the Organization.
>>> await org.add_user("3FJ5Tulq5mUXMadbk63sHO")
remove_user(user: StrOrUser) -> None
async
Remove a user from the Organization.
This method requires that you have the Admin role for the Organization.
>>> user = await org.users().filter(lambda u: u.email == "bob@example.org").nth(0)
>>> await org.remove_user(user)