RudderStack-Airflow Integration

Orchestrate your Profiles and Reverse ETL jobs programmatically with RudderStack’s Airflow provider.

Apache Airflow is an open source platform to schedule, manage, and monitor workflows for data engineering pipelines.

RudderStack provides an Airflow operator for triggering your Profiles runs and Reverse ETL syncs programmatically via Airflow.

See the GitHub Repository for more information on the codebase with sample DAGs.

Prerequisites

info
For production use cases, RudderStack recommends using a service access token instead of personal access token.

Run Airflow

Initialize all dependencies by running Apache Airflow via the following command:

airflow standalone
warning
The Airflow standalone server is not meant for use in production. It is highly recommended using alternate methods to install and run Airflow in a production environment.

Install Airflow operator

Install the RudderStack Airflow Provider by running the following command:

pip install rudderstack-airflow-provider

Create Airflow connection

To create a new Airflow connection, follow these steps:

  1. In your Airflow dashboard, go to Admin > Connections:
Airflow dashboard Connections option
  1. Add a new connection by configuring the below settings:
Airflow dashboard edit connection
SettingDescription
Connection IDSpecify a unique connection name. By default, RudderstackRETLOperator / RudderstackProfilesOperator uses the connection name rudderstack_default.

Note: If you have created a connection with a different name, make sure that name is passed as a parameter to the above operators.
Connection TypeSet this to HTTP.
HostSet the value depending on your region.

  • Standard (US): https://api.rudderstack.com
  • EU: https://api.eu.rudderstack.com
PasswordEnter your workspace-level Service Access Token.

RudderStack operators

Use the below operators to create DAGs and schedule them via Airflow.

warning

Make sure to set the schedule type to Manual in the RudderStack dashboard for both Reverse ETL syncs and Profiles jobs.

This way, the sync jobs can be triggered only via Airflow.

RudderStackRETLOperator

Use RudderStackRETLOperator to schedule and trigger your Reverse ETL syncs via Airflow.

A simple DAG for triggering syncs for a Reverse ETL source is shown below. See this example for the complete code.

with DAG(
    "rudderstack-retl-sample",
    default_args=default_args,
    description="A simple tutorial DAG for Reverse ETL",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["rs-retl"],
) as dag:
    # retl_connection_id, sync_type are template fields
    rs_operator = RudderstackRETLOperator(
        retl_connection_id="<retl_connection_id>",
        task_id="<airflow_task_id>",
        connection_id="<connection_id>"
    )

The RudderstackRETLOperator parameters are described below:

ParameterDescriptionTypeDefault value
retl_connection_id
Required
Your Reverse ETL Connection ID from the RudderStack dashboard.String (templatable)-
connection_id
Required
Connection ID used while setting up the Airflow connection.Stringrudderstack_default
task_id
Required
Unique, meaningful ID for the job. See the Airflow documentation for more information on this parameter.String-
poll_intervalTime (in seconds) for the polling status of triggered job.Float10
poll_timeoutTime (in seconds) after which the polling for a triggered job is declared timed out.FloatNone, that is, the provider keeps polling till the job completes or fails.
sync_typeSync type. Acceptable values are full and incremental.StringNone as RudderStack determines the sync type.
request_max_retriesMaximum number of times requests to the RudderStack API should be retried before failing.Integer3
request_retry_delayTime (in seconds) to wait between each request retry.Integer1
request_timeoutTime (in seconds) after which the requests to RudderStack are declared timed out.Integer30
wait_for_completionDetermines if the execution run should poll and wait till sync completion.BooleanTrue
info
RudderStack recommends retaining the default values for the non-mandatory parameters.

RudderStackProfilesOperator

Use RudderstackProfilesOperator to trigger a Profiles run.

A simple DAG for triggering a Profiles project run is shown:

with DAG(
    "rudderstack-profiles-sample",
    default_args=default_args,
    description="A simple tutorial DAG for Profiles run.",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["rs-profiles"],
) as dag:
    # profile_id is template field
    rs_operator = RudderstackProfilesOperator(
        profile_id="<profiles_id>",
        task_id="<airflow_task_id>",
        connection_id="<connection_id>",
    )

The RudderstackRETLOperator parameters are described below:

ParameterDescriptionTypeDefault value
profiles_id
Required
Your Profiles project ID from the RudderStack dashboard.String (templatable)-
connection_id
Required
Connection ID used while setting up the Airflow connection.Stringrudderstack_default
task_id
Required
Unique, meaningful ID for the job. See the Airflow documentation for more information on this parameter.String-
parametersAdditional parameters to pass to the Profiles run command, as supported by the API endpoint.StringNone
poll_intervalTime (in seconds) for the polling status of triggered job.Float10
poll_timeoutTime (in seconds) after which the polling for a triggered job is declared timed out.FloatNone, that is, the provider keeps polling till the job completes or fails.
request_max_retriesMaximum number of times requests to the RudderStack API should be retried before failing.Integer3
request_retry_delayTime (in seconds) to wait between each request retry.Integer1
request_timeoutTime (in seconds) after which the requests to RudderStack are declared timed out.Integer30
wait_for_completionDetermines if the execution run should poll and wait till sync completion.BooleanTrue
info
RudderStack recommends retaining the default values for the non-mandatory parameters.

Run a DAG

Once you have defined a DAG and configured an Airflow connection, run the following commands to allow Airflow to pick up and run the DAG:

export AIRFLOW_HOME=</path/to/airflow_home>
mkdir $AIRFLOW_HOME/dags
cp rudderstack_dag.py $AIRFLOW_HOME/dags

Make sure the Airflow scheduler is running in the background. Also, you must enable the DAG in the Airflow dashboard:

enabling Airflow DAG in dashboard

You can trigger a DAG by clicking on the play button on the right as seen above and selecting Trigger DAG.

warning
Stopping the DAG will not cancel the ongoing sync.

Sample DAG

A sample DAG for a Profiles run followed by a Reverse ETL sync is shown:

from datetime import timedelta

from airflow import DAG

from rudder_airflow_provider.operators.rudderstack import (
	RudderstackRETLOperator,
	RudderstackProfilesOperator
)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['alex@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}

with DAG('rudderstack-profiles-then-retl-sample',
    default_args=default_args,
    description='A simple tutorial DAG for Profiles run and then Reverse ETL sync.',
    catchup=False,
    tags=['rs']) as dag:
    # profile_id is a template field.
    profiles_task = RudderstackProfilesOperator(
        profile_id="{{ var.value.profile_id }}",
        task_id="<airflow_task_id>",
        connection_id="<connection_id>",
    )
    # retl_connection_id is a template field.
    retl_sync_1_task = RudderstackRETLOperator(
        retl_connection_id="{{ var.value.retl_connection_id1 }}",
        connection_id='<connection_id>',
        wait_for_completion=True,
        retry_delay=timedelta(seconds=5),
        retries=1,
    )
	# another retl sync
    retl_sync_2_task = RudderstackRETLOperator(
        retl_connection_id="{{ var.value.retl_connection_id2 }}",
        task_id="<airflow_task_id>",
        connection_id='<connection_id>'',
        wait_for_completion=True
    )
	
	# run profiles_task, then retl_sync_1_task and retl_sync_2_task in parallel
	profiles_task >> [retl_sync_1_task, retl_sync_2_task]
	

FAQ

Where can I find the connection ID for my Reverse ETL connection?

The connection ID is a unique identifier for any Reverse ETL connection set up in RudderStack.

To obtain the connection ID, click the destination connected to your Reverse ETL source and go to the Settings tab.

connection ID for Reverse ETL

Where can I find my Profiles project ID?

To obtain the Profiles project ID, go to your project in the RudderStack dashboard and note it down from the URL:


Questions? Contact us by email or on Slack