Overview
Importing Dataset in Rockfish Data Platform
The Rockfish Data Platform allows you to import datasets in two primary ways, enabling flexibility depending on your use case:
-
One-by-One Import: This method is ideal for manual dataset imports. You can upload individual datasets through the Rockfish SDK.
-
Continous or Periodic Injest: This method is suitable for regularly processed datasets. You can configure your data pipeline to push new data into the platform on a recurring schedule, which the system automatically processes.
Supported Data Formats:
The Rockfish SDK supports importing datasets in the following formats:
- CSV (.csv)
- Parquet (.parquet)
- JSON (.json and .jsonl)
- Pandas DataFrame
Importing Individual Datasets
You can easily import datasets in different formats using the Rockfish SDK. Below are some examples:
Install Rockfish SDK
Before importing, make sure you have the Rockfish SDK installed:
import rockfish as rf
Importing a CSV File
dataset = rf.Dataset.from_csv("name of csv file", "current csv file path")
Importing a Parquet File
dataset = rf.Dataset.from_parquet("name of parquet file", "current parquet file path")
Importing a JSON or JSONL File
Rockfish supports both .json and .jsonl (newline-delimited JSON) formats:
data = rf.Dataset.from_json("name of json file", "current json file path")
Flatten Table (Optional)
If your imported CSV, Parquet, or JSON file contains nested JSON objects, you can use the Rockfish Flatten action to convert them into a flat table format for easier processing. After synthetic data generation, you can reverse this process with the Unflatten action.
To flatten the dataset:
rockfish.actions.Flatten()
rockfish.actions.Unflatten()
Importing Datasets Created from Pandas DataFrame
If you have a dataset in the form of a Pandas DataFrame (df), you can directly import the DataFrame into Rockfish:
dataset = rf.Dataset.from_pandas("name of dataframe", df)
Listing Datasets
Once you successfully load data, it will be saved as a dataset ID linked with your API key. You can list the datasets associated with your account by using the following code:
async for dataset in conn.list_datasets():
print(f"{dataset!r}")
Re-Import data
To re-import a dataset that you've previously loaded into the Rockfish platform, simply select the corresponding dataset ID and use the following code:
dataset = rf.Dataset.from_id(conn, 'my-dataset-id')
Continous Ingest - Event Driven Data
If you are using time-series data it is likely that you have a continuous source of new data. Keeping models up to date with your latest data is the best way to keep your synthetic data relevant. Time-series data often follows a seasonal pattern the boundaries of these are often good places to train new models. For example some events happen on particular days of the week, in this case it is often beneficial to have separate model for each day. You can use previous models that have similar properties to your current batch to improve training, or you can store individual trained models separately and generate data either by blending the output of multiple models or generating from models that most closely match your desired output. More in the documentation for generating synthetic data. This section covers how to set up a configuration once then train models as your data comes in.
DataStreams
The Rockfish SDK allows you to create a DatasetStream action as part of a workflow. Once you have a configuration that you are happy with you can add a DataStream action that will create a data queue for you to send in your data for processing with the actions and settings of your selected workflow. Here is an example of setting up a workflow, then adding a DataStream
action to it:
import rockfish as rf
from rockfish.labs.dataset_properties import DatasetPropertyExtractor
from rockfish.labs.steps import Recommender
dataset = rf.Dataset.from_csv("finance", "finance.csv")
dataset_properties = DatasetPropertyExtractor(
dataset,
session_key="customer",
metadata_fields=["age", "gender"],
additional_property_keys=["association_rules"]
).extract()
recommender_output = Recommender(dataset_properties).run()
rec_actions = recommender_output.actions
save = ra.DatasetSave({"name": "synthetic"})
builder = rf.WorkflowBuilder()
builder.add_path(dataset, *rec_actions, save)
Updating the previous example:
import rockfish as rf
from rockfish.labs.dataset_properties import DatasetPropertyExtractor
from rockfish.labs.steps import Recommender
dataset = rf.Dataset.from_csv("finance", "finance.csv")
rec_actions = recommender_output.actions
save = ra.DatasetSave({"name": "synthetic"})
datastream = ra.DatastreamLoad()
builder = rf.WorkflowBuilder()
builder.add_path(datastream, *rec_actions, save)
workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")
new_data = rf.Dataset.from_csv("new_finance_data", "new_finance.csv")
await workflow.write_datastream(datastream, dataset)
write_datastream
function anytime a new batch of data is ready for training.
For more details of importing functions in Rockfish SDK, please refer to rockfish.Dataset