You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Automating your data pipelines#

In this guide, we’ll walk you through how to run your data pipelines without manual intervention, i.e. automate, and identify the Dagster tools to make that happen.

This guide assumes you have some familiarity with several Dagster concepts, including software-defined assets and jobs.

Dagster offers a few different ways of automating data pipelines, and choosing the right Dagster tool depends on your specific needs. In this guide we'll walk through a few different cases, and which Dagster tool you should use in each case.

Things to think through#

When thinking about automating your data pipelines, it’s helpful to think through how and when your data needs to be refreshed. A few helpful things to think through:

  • How often does my data need to be refreshed? Does all data need to be refreshed at the same frequency?
  • How is my data split up? Do older records need updating?
  • Am I waiting for some data that needs to trigger downstream updates?
  • Do I want it to run every time a new file is added or update data in batches?

My data should be updated every morning#

You want computation to be triggered based on the time. For example, you want some data to be updated every day by a certain time to refresh a dashboard, or you want your data warehouse to have the most recent information for the team to work with. This is one of the most traditional cases when building a pipeline. Let's go into how to do this with Dagster!

I want a job to start at a specific time#

Dagster offers basic scheduling capabilities that allow you to specify how often and when you want a job to run. This can mean daily, weekly, or hourly. Dagster also supports any type of cron scheduling.

from dagster import AssetSelection, define_asset_job, ScheduleDefinition

asset_job = define_asset_job("asset_job", AssetSelection.groups("some_asset_group"))

basic_schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 9 * * *")

You can also use the @schedule decorator if you want to provide custom run config and tags. For example, if you want to pass a parameter to the job at runtime, for example, activity = ‘party’ on the weekend and activity=’grind’ on weekdays, you can use the configuration to pass parameters in.

# sets the schedule to be updated everyday at 9:00 AM
@schedule(job=configurable_job, cron_schedule="0 9 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
    if context.scheduled_execution_time.weekday() < 5:
        activity_selection = "grind"
    else:
        activity_selection = "party"
    return RunRequest(
        run_config={
            "ops": {"configurable_op": {"config": {"activity": activity_selection}}}
        }
    )

I want specify how fresh each asset needs to be and let Dagster figure out when to run them#

As an alternative to thinking about specific times that you want to run jobs, Dagster offers a declarative approach to scheduling, based on how up-to-date you need each of your data assets to be. When you opt for this approach, you specify FreshnessPolicys on each of the assets that you need to be updated, and then tell Dagster to use those freshness policies to figure out when to auto-materialize your assets.

In the following example:

  • By 9 am each morning, we want the finance_report to include data on all the transactions from the previous day, so that the finance team can review it.
  • We want the sales_report to always include data on all the transactions that occurred two hours before or earlier.
  • The transactions_cleaned table feeds both of these reports. We don't have any direct requirements for it, but it needs to be updated so that the reports can have up-to-date transaction data.

We accomplish this by setting FreshnessPolicys on the report assets and lazy AutoMaterializePolicys on all the assets. When possible, Dagster will time runs of transactions_cleaned intelligently so that it doesn't need to be updated separately on behalf of each downstream report.

Note that this way of scheduling is currently marked experimental, and some of the APIs may change in the future.

from dagster import (
    AutoMaterializePolicy,
    Definitions,
    FreshnessPolicy,
    asset,
    load_assets_from_current_module,
)


@asset
def transactions_cleaned():
    ...


@asset(
    freshness_policy=FreshnessPolicy(
        maximum_lag_minutes=9 * 24, cron_schedule="0 9 * * *"
    )
)
def sales_report(transactions_cleaned):
    ...


@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=120))
def finance_report(transactions_cleaned):
    ...


defs = Definitions(
    assets=load_assets_from_current_module(
        auto_materialize_policy=AutoMaterializePolicy.lazy(),
    )
)

My data should be updated every time something happens#

I want my data to be updated when an event happens#

Let’s say you have sales pipeline data that you want updated every time a customer submits an RFP on your website. Dagster offers sensors that allow you to kick off jobs based on some external change, such as a new file in an S3 bucket, if some other asset is updated, or if a system is down.

from dagster_aws.s3.sensor import get_s3_keys


@sensor(job=log_file_job)
def my_s3_sensor(context):
    since_key = context.cursor or None
    new_s3_keys = get_s3_keys("my_s3_rfp_bucket", since_key=since_key)
    if not new_s3_keys:
        return SkipReason("No new s3 files found for bucket my_s3_rfp_bucket.")
    last_key = new_s3_keys[-1]
    run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
    context.update_cursor(last_key)
    return run_requests

I want some assets to materialize every time other assets materialize#

For example, the asset2 is downstream of asset1, and you want asset2 to be materialized every time asset1 is materialized.

Dagster provides eager AutoMaterializePolicys, which allow materializing an asset whenever it's parent assets are materialized. Note that this feature is currently marked experimental, and some of the APIs may change in the future.

from dagster import AutoMaterializePolicy, asset


@asset
def asset1():
    ...


@asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1])
def asset2():
    ...

To learn more about auto-materializing assets, visit the Concept page.


Some of my data should be updated, but not everything#

Let’s say you have a database with all your credit card transactions. When a return occurs, the original purchase record doesn’t change last week’s data, but a new transaction showing the return is added to the current day’s information.

In this case, you want your data pipeline to include all the latest data, but it doesn’t make sense to continuously process data from two years or two days ago when you know it hasn’t changed. What you want is to process yesterday’s data and not waste compute on the rest of the data.

Dagster partitions do just that. A partition is a slice of your data; in this case, each partition represents a day of transactions. You can partition or ‘split’ your data based on whatever makes the most sense, by one or multiple dimensions.

Partitions can be used with both schedules and sensors. You can use schedules to kick off a partitioned job to update slices of data. You can also trigger runs once a specific partition has been updated using auto-materialize policies based on partitioned materializations.


Dagster automation cheat sheet#

Use this table as a quick reference when building out your data pipelines:

NameGood for automating data pipelines that need to be refreshed:Asset SupportOp/Graph Support
Basic (cron) schedulingby kicking off updates at a predictable time interval
Sensorswhen a specific event occurs
Eager auto-materialize policieswhen assets are out of date with other assets
Lazy auto-materialize policies w/freshness policeswhen assets are beyond an acceptable lag time