Skip to content

Overview

Importing Dataset in Rockfish Data Platform

The Rockfish Data Platform allows you to import datasets in two primary ways, enabling flexibility depending on your use case:

  1. One-by-One Import: This method is ideal for manual dataset imports. You can upload individual datasets through the Rockfish SDK.

  2. Continous or Periodic Injest: This method is suitable for regularly processed datasets. You can configure your data pipeline to push new data into the platform on a recurring schedule, which the system automatically processes.

Supported Data Formats:

The Rockfish SDK supports importing datasets in the following formats:

  1. CSV (.csv)
  2. Parquet (.parquet)
  3. JSON (.json and .jsonl)
  4. Pandas DataFrame

Importing Individual Datasets

You can easily import datasets in different formats using the Rockfish SDK. Below are some examples:

Install Rockfish SDK

Before importing, make sure you have the Rockfish SDK installed:

import rockfish as rf

Importing a CSV File

dataset = rf.Dataset.from_csv("name of csv file", "current csv file path")

Importing a Parquet File

dataset = rf.Dataset.from_parquet("name of parquet file", "current parquet file path")

Importing a JSON or JSONL File

Rockfish supports both .json and .jsonl (newline-delimited JSON) formats:

data = rf.Dataset.from_json("name of json file", "current json file path")

Flatten Table (Optional)

If your imported CSV, Parquet, or JSON file contains nested JSON objects, you can use the Rockfish Flatten action to convert them into a flat table format for easier processing. After synthetic data generation, you can reverse this process with the Unflatten action.

To flatten the dataset:

rockfish.actions.Flatten()
After generating synthetic data, you can unflatten it back into the original structure:
rockfish.actions.Unflatten()

Importing Datasets Created from Pandas DataFrame

If you have a dataset in the form of a Pandas DataFrame (df), you can directly import the DataFrame into Rockfish:

dataset = rf.Dataset.from_pandas("name of dataframe", df)

Listing Datasets

Once you successfully load data, it will be saved as a dataset ID linked with your API key. You can list the datasets associated with your account by using the following code:

async for dataset in conn.list_datasets():
    print(f"{dataset!r}")

Re-Import data

To re-import a dataset that you've previously loaded into the Rockfish platform, simply select the corresponding dataset ID and use the following code:

dataset = rf.Dataset.from_id(conn, 'my-dataset-id')

Continous Ingest - Event Driven Data

If you are using time-series data it is likely that you have a continuous source of new data. Keeping models up to date with your latest data is the best way to keep your synthetic data relevant. Time-series data often follows a seasonal pattern the boundaries of these are often good places to train new models. For example some events happen on particular days of the week, in this case it is often beneficial to have separate model for each day. You can use previous models that have similar properties to your current batch to improve training, or you can store individual trained models separately and generate data either by blending the output of multiple models or generating from models that most closely match your desired output. More in the documentation for generating synthetic data. This section covers how to set up a configuration once then train models as your data comes in.

DataStreams

The Rockfish SDK allows you to create a DatasetStream action as part of a workflow. Once you have a configuration that you are happy with you can add a DataStream action that will create a data queue for you to send in your data for processing with the actions and settings of your selected workflow. Here is an example of setting up a workflow, then adding a DataStream action to it:

import rockfish as rf
from rockfish.labs.dataset_properties import DatasetPropertyExtractor
from rockfish.labs.steps import Recommender

dataset = rf.Dataset.from_csv("finance", "finance.csv")

dataset_properties = DatasetPropertyExtractor(
    dataset,
    session_key="customer",
    metadata_fields=["age", "gender"],
    additional_property_keys=["association_rules"]
).extract()

recommender_output = Recommender(dataset_properties).run()
rec_actions = recommender_output.actions
save = ra.DatasetSave({"name": "synthetic"})
builder = rf.WorkflowBuilder()
builder.add_path(dataset, *rec_actions, save)
This gives you an initial model from the first dataset. If you are happy with the results you can create a new workflow that will use these settings to set up an endpoint for ingest that will run the same set of actions and configurations for all new data posted to it.

Updating the previous example:

import rockfish as rf
from rockfish.labs.dataset_properties import DatasetPropertyExtractor
from rockfish.labs.steps import Recommender

dataset = rf.Dataset.from_csv("finance", "finance.csv")

rec_actions = recommender_output.actions
save = ra.DatasetSave({"name": "synthetic"})

datastream = ra.DatastreamLoad()
builder = rf.WorkflowBuilder()
builder.add_path(datastream, *rec_actions, save)

workflow = await builder.start(conn)
print(f"Workflow: {workflow.id()}")

new_data = rf.Dataset.from_csv("new_finance_data", "new_finance.csv")

await workflow.write_datastream(datastream, dataset)
This will create workflow actions and configurations based on the initial dataset, but instead of ingesting and processing that data you will set up a DataStream action that will run until explicitly stopped. You can use the write_datastream function anytime a new batch of data is ready for training.

For more details of importing functions in Rockfish SDK, please refer to rockfish.Dataset