Skip to content

Generic Rockfish Integration

Integrating Rockfish

To ensure that the synthetic data you are using captures the latest data trends, Rockfish makes it easy for you to operationalize Rockfish Platform within your ML/Ops pipeline. With Rockfish Modules you can set-up continuous or periodic ingestion of latest data and train the models capturing the statistical properties of the latest data.

Some of the key scenarios where this will be very beneficial:

  • Preserving Data: Maintain access to data that would otherwise be unavailable due to retention policies.
  • Evolving Data Patterns: Continuously ingest data to keep models updated as patterns shift over time.
  • Real-Time Anomaly Detection: Continuously train models for real-time detection of emerging anomalies in fields like network security, system monitoring, or IoT environments.
  • Demand Forecasting in Retail and Logistics: Continuously retrain models to accurately predict fluctuating demand patterns, ensuring effective supply chain and inventory management.

This tutorial will guide you through setting up an end-to-end workflow using the Rockfish Generative Data Platform.


Prerequisites

  • API Key: Make sure you have your API key ready. If not, request it at support@rockfish.ai.
  • Install Rockfish SDK: You can either use the Rockfish Managed Cloud or install the SDK locally. To install locally:

Setup

Install using pip. Make sure you have the latest version of pip installed.

pip install -U 'rockfish[labs]' -f 'https://docs142.rockfish.ai/packages/index.html'

Verification

To check that the Rockfish was installed properly, use this code:

import rockfish as rf
print(rf.product_version)

Imports

To follow along the tutorial below, import the following:

import rockfish as rf
import rockfish.actions as ra
import rockfish.labs as rl
from rockfish.labs.dataset_properties import DatasetPropertyExtractor
from rockfish.labs.steps import ModelSelection, Recommender

Connecting to the Rockfish Platform

The Rockfish Platform lets you perform model training and generation. Using your API key, connect to the platform:

conn = rf.Connection.remote("<API_URL>", "<API_KEY")
All Rockfish workflows will use this connection object. Refer to the documentation on connections for more ways to connect to the Rockfish platform.


Step 1: Onboard Data

First, import your sample dataset. You can also use Rockfish connectors to load the dataset from a cloud storage provider.

Once the dataset is imported, Rockfish's onboarding module automatically identifies and describes the data properties through an extractor called DatasetPropertyExtractor. This extractor generates a DatasetProperties object that can be leveraged by other Rockfish components such as the Recommender.

dataset = rf.Dataset.from_csv("<DATASET_NAME>", "<PATH_TO_FILE>")

dataset_properties = DatasetPropertyExtractor(
    dataset=dataset,
    session_key=session_key,
    metadata_fields=metadata_fields
).extract()

recommender_output = Recommender(
    dataset_properties=dataset_properties,
    steps=[ModelSelection()]
).run()

print(recommender_output.report)

The Recommender returns a list of Rockfish actions that should be part of your workflow. These actions will pre-process your dataset, train a Rockfish model, and generate synthetic data from the model. Validate that the synthetic data created by this workflow passes your fidelity/privacy requirements. You can configure the actions to meet your requirements.

# get synthetic data 
save = ra.DatasetSave(name=f"Rockfish_Onboard")
builder = rf.WorkflowBuilder()
builder.add_path(dataset, *recommender_output.actions, save)
workflow = await builder.start(conn)
syn = await (await workflow.datasets().last()).to_local(conn)

# perform data quality checks to validate and tune the workflow: 
rl.vis.custom_plot(
    datasets=[dataset, syn],
    query="<SQL_QUERY>",
    plot_func=rl.vis.plot_bar,
    field="<FIELD_NAME>",
)

The code for this example can be found in Onboarding Example

Once you have onboarded your dataset, you can use the workflow for the next step. For evaluation of synthetic sample data quality, refer to Evaluation of Data.


Step 2: Train

To ensure your models are always trained on the latest data, the Rockfish platform supports continuous ingestion of datasets. This ensures that the generated synthetic data matches evolving trends and patterns, which is critical for the key use cases mentioned above.

To set up continuous ingestion, add a DatasetStream to the beginning of your workflow (instead of the sample dataset used during onboarding). This action creates a data queue, so you can pass streaming data through the workflow you set up during onboarding.

You can also use Rockfish connectors to stream data from a cloud storage provider.

# start workflow with datastream
datastream = ra.DatastreamLoad()
save = ra.DatasetSave(name=f"Rockfish_Train")
builder = rf.WorkflowBuilder()
builder.add_path(datastream, *recommender_output.actions, save)
workflow = await builder.start(conn)

After starting the workflow, use the write_datastream function anytime a new batch of data is ready for training.

# write new datasets to the stream
dataset_paths = ["<NEW_DATASET_PATH>"]
for i, path in enumerate(dataset_paths):
    dataset = rf.Dataset.from_csv(f"train_{i}", path)
    await workflow.write_datastream(datastream, dataset)

Each time a new batch is sent to a stream, a model is trained on it. Be default, this will train a new model for each batch.

The code for this example can be found in Training Example


Step 3: Generate

Once the models are trained and labeled and available in model store, users can, at any given time query the specific model(s) to generate the data they want to solve their use case.

Refer Model Store to learn more about Model Store, Labels and using labels to query and generate synthetic data.

For evaluation of synthetic data quality, refer to Evaluation of Data.

The code for this example can be found in Generation Example

Evaluation

To ensure that the generated synthetic data accurately represents the statistical properties of the original data (synthetic data fidelity), Rockfish provides evaluation mechanisms via the Synthetic Data Accessor (SDA).

To learn more, refer to Synthetic Data Accessor (a.k.a SDA). We also provide SQL queries that you can use to define custom metrics.

This step can and should be run at any time to ensure that the data quality is upto par.

This should be used after onboarding to validate that the sample data quality is good, and after generation to validate that the synthetic data quality is good.

A simple example of using the custom plot function for visualisation of synthetic data
rl.vis.custom_plot(
    datasets=[dataset, syn],
    query="<SQL_QUERY>",
    plot_func=rl.vis.plot_bar,
    field="<FIELD_NAME>",
)

Next Steps

Install the Rockfish CLI and explore use cases tutorials for hands on experience of using Rockfish to generate Synthetic Data.