RudderStack-Dagster Integration

Orchestrate your Profiles and Reverse ETL jobs programmatically with RudderStack’s Dagster integration.

Dagster is a popular orchestrator designed for data pipeline and management.

RudderStack’s Dagster library lets you integrate your Profiles runs and Reverse ETL syncs programmatically, enabling automated scheduling and orchestration of the jobs.

Prerequisites

info
For production use cases, RudderStack recommends using a service access token instead of personal access token.

Install Dagster library

Install the dagster-rudderstack package to use Dagster with RudderStack:

pip install dagster-rudderstack

Set up resources

You can define resources like your RudderStack connection details (for example, access token) in Dagster - this establishes the context for running the operations.

A sample code defining a resource for Reverse ETL and Profiles is shown:

#resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackRETLResource, RudderStackProfilesResource

rudderstack_retl_resource = RudderStackRETLResource(
            access_token="<service_access_token>")

rudderstack_profiles_resource = RudderStackProfilesResource(
            access_token="<service_access_token>")

The RudderStackRETLResource and RudderStackProfilesResource parameters are described below:

ParameterDescriptionDefault value
access_token
Required
The service access token mentioned in the Prerequisites section.-
rs_cloud_urlThe RudderStack API URL depending on your region.

  • Standard (US): https://api.rudderstack.com
  • EU: https://api.eu.rudderstack.com
https://api.rudderstack.com
request_max_retriesMaximum number of times requests to the RudderStack API should be retried before failing.3
request_retry_delayTime (in seconds) to wait between each request retry.1
request_timeoutTime (in seconds) after which the requests to RudderStack are declared timed out.30
poll_timeoutTime (in seconds) after which the polling for a triggered job is declared timed out.None (keeps polling till the job completes or fails)

Define jobs

Ops are RudderStack-specific operations that let you define a Dagster job for your Profiles or Reverse ETL syncs that run on a defined schedule. You can also define a job with a dependency on Profiles runs followed by Reverse ETL syncs.

warning

Make sure to set the schedule type to Manual in the RudderStack dashboard for both Reverse ETL syncs and Profiles jobs.

This way, the sync jobs can be triggered only via Dagster.

Schedule Reverse ETL syncs

Once you have defined the Reverse ETL resource, use the below sample code to schedule Reverse ETL sync jobs:

# jobs.py
from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from .resources import rudderstack_retl_resource

@job(
    resource_defs={
        "retl_resource": rudderstack_retl_resource
    }
)
def rs_retl_sync():
        rudderstack_sync_op()

rudderstack_sync_schedule = ScheduleDefinition(
    job=rs_retl_sync_job,
    cron_schedule="0 * * * *",  # Runs every hour.
    run_config={"ops": {"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="<retl_connection_id>")}},
    default_status=DefaultScheduleStatus.RUNNING
)

Make sure to provide the Reverse ETL connection ID for the sync job in RudderStackRETLOpConfig.

Schedule Profiles runs

Once you have defined the Profiles resource, use the below sample code to schedule Profiles runs:

# jobs.py
from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_profiles_resource

@job(
    resource_defs={
        "profiles_resource": rudderstack_profiles_resource
    }
)
def rs_profiles_run():
        rudderstack_profiles_op()

rudderstack_profile_schedule = ScheduleDefinition(
    job=rs_profiles_run,
    cron_schedule="0 0 * * *",  # Runs every day at midnight.
    run_config={"ops": {"rudderstack_profiles_op": RudderStackProfilesOpConfig(profile_id="<profiles_project_id")}},
    default_status=DefaultScheduleStatus.RUNNING
)

Make sure to provide the Profiles project ID in RudderStackProfilesOpConfig.

Define job sequence

success
This section is helpful in cases where you want to run your Profiles project first and then trigger one or multiple Reverse ETL syncs to update the downstream tools.

The following code highlights how you can create a DAG of multiple Reverse ETL syncs that are triggered after a successful Profiles run.

# jobs.py

from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from dagster_rudderstack.ops.profiles import rudderstack_profiles_op, RudderStackProfilesOpConfig
from .resources import rudderstack_retl_resource, rudderstack_profiles_resource

@job(
    resource_defs={
        "profiles_resource": rudderstack_profiles_resource,
        "retl_resource": rudderstack_retl_resource
    }
)
def rs_profiles_then_retl_run():
    profiles_op = rudderstack_profiles_op()
    rudderstack_sync_op(start_after=profiles_op)

rudderstack_sync_schedule = ScheduleDefinition(
    job=rs_profiles_then_retl_run,
    cron_schedule="0 0 * * *",  # Runs every day at midnight.
    run_config=RunConfig(
                ops={
                    "rudderstack_profiles_op": RudderStackProfilesOpConfig(profile_id="<profiles_project_id>"),
                    "rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="<retl_connection_id>"),
                }
        )    
    default_status=DefaultScheduleStatus.RUNNING
)

Make sure to provide:

info

If one of the operation fails, the job raises an exception without running the next operation.

However, you can configure the job as per your requirement to ignore any failure and run the subsequent operations by using try/catch exceptions.

FAQ

Where can I find the connection ID for my Reverse ETL connection?

The connection ID is a unique identifier for any Reverse ETL connection set up in RudderStack.

To obtain the connection ID, click the destination connected to your Reverse ETL source and go to the Settings tab.

connection ID for Reverse ETL

Where can I find my Profiles project ID?

To obtain the Profiles project ID, go to your project in the RudderStack dashboard and note it down from the URL:


Questions? Contact us by email or on Slack