You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Partitioned assets and jobs#

A software-defined asset can represent a collection of partitions that can be tracked and materialized independently. In many ways, each partition functions like its own mini-asset, but they all share a common materialization function and dependencies. Typically, each partition will correspond to a separate file, or a slice of a table in a database.

A common use is for each partition to represent all the records in a data set that fall within a particular time window, e.g. hourly, daily or monthly. Alternatively, each partition can represent a region, a customer, an experiment - any dimension along which you want to be able to materialize and monitor independently. An asset can also be partitioned along multiple dimensions, e.g. by region and by hour.

A graph of assets with the same partitions implicitly forms a partitioned data pipeline, and you can launch a run that selects multiple assets and materializes the same partition in each asset.

Similarly, a partitioned job is a job where each run corresponds to a partition. It's common to construct a partitioned job that materializes a single partition across a set of partitioned assets every time it runs.

Having defined a partitioned asset or job, you can:

  • View runs by partition in the Dagster UI.
  • Define a schedule that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
  • Launch backfills, which are sets of runs that each process a different partition. For example, after making a code change, you might want to run your job on all partitions instead of just one of them.

Partitioned assets#

Relevant APIs#

NameDescription
PartitionsDefinitionSuperclass - defines the set of partitions that can be materialized for an asset.
HourlyPartitionsDefinitionA partitions definition with a partition for each hour.
DailyPartitionsDefinitionA partitions definition with a partition for each day.
WeeklyPartitionsDefinitionA partitions definition with a partition for each week.
MonthlyPartitionsDefinitionA partitions definition with a partition for each month.
StaticPartitionsDefinitionA partitions definition with a fixed set of partitions.
MultiPartitionsDefinitionA partitions definition with multiple dimensions.
MultiPartitionKeyA multi-dimensional partition key.
DynamicPartitionsDefinitionA partitions definition whose partitions can be dynamically added and removed.
OpExecutionContext.partition_keyThe partition key for the current run, which can be accessed in the computation.

A software-defined asset can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.

Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in the UI.

Defining partitioned assets#

For example, below is a software-defined asset with a partition for each day since the first day of 2022. Materializing partition 2022-07-23 of this asset would result in fetching data from the URL coolweatherwebsite.com/weather_obs\&date=2022-07-23 and storing it at the path weather_observations/2022-07-23.csv.

import urllib.request

from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context) -> None:
    partition_date_str = context.asset_partition_key_for_output()

    url = f"coolweatherwebsite.com/weather_obs&date={partition_date_str}"
    target_location = f"weather_observations/{partition_date_str}.csv"

    urllib.request.urlretrieve(url, target_location)

Partitioned assets with partitioned I/O managers#

In the above code snippet, the body of the decorated function writes out data to a file, but you may also choose to delegate this writing operation to an I/O manager. Dagster's built-in I/O managers know how to handle partitioned assets. You can also handle them when writing your own I/O manager, following the instructions here.

Here's a software-defined asset that relies on an I/O manager to store its output:

import pandas as pd

from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context) -> pd.DataFrame:
    partition_date_str = context.asset_partition_key_for_output()
    return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")

If you're using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.

Multi-dimensionally partitioned assets#

The MultiPartitionsDefinition class accepts a mapping of dimension name to partitions definition, creating a partition for each unique combination of dimension partitions. For example, the asset below would contain a partition for each combination of color and date: red|2022-01-01, yellow|2022-01-01, blue|2022-01-01, red|2022-01-02 and so on.

from dagster import (
    DailyPartitionsDefinition,
    MultiPartitionsDefinition,
    StaticPartitionsDefinition,
    asset,
)


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date="2022-01-01"),
            "color": StaticPartitionsDefinition(["red", "yellow", "blue"]),
        }
    )
)
def multi_partitions_asset(context):
    context.log.info(context.partition_key.keys_by_dimension)

Currently, Dagster only allows two-dimensional multipartitions definitions.

Notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:

from dagster import MultiPartitionKey, materialize

result = materialize(
    [multi_partitions_asset],
    partition_key=MultiPartitionKey({"date": "2022-01-01", "color": "red"}),
)

Dynamically partitioned assets#

Sometimes you don't know the set of partitions ahead of time when you're defining your assets. For example, maybe you want to add a new partition every time a new data file lands in a directory, or every time you want to experiment with a new set of hyperparameters. In these cases, you can use a DynamicPartitionsDefinition.

The DynamicPartitionsDefinition class accepts a name argument, representing the name of the partition set:

images_partitions_def = DynamicPartitionsDefinition(name="images")


@asset(partitions_def=images_partitions_def)
def images(context):
    ...

For a given dynamic partition set, partition keys can be added and removed. One common pattern is detecting the presence of a new partition through a sensor, adding the partition, and then triggering a run for that partition:

images_job = define_asset_job(
    "images_job", AssetSelection.keys("images"), partitions_def=images_partitions_def
)


@sensor(job=images_job)
def image_sensor(context):
    new_images = [
        img_filename
        for img_filename in os.listdir(os.getenv("MY_DIRECTORY"))
        if not context.instance.has_dynamic_partition(
            images_partitions_def.name, img_filename
        )
    ]

    return SensorResult(
        run_requests=[
            RunRequest(partition_key=img_filename) for img_filename in new_images
        ],
        dynamic_partitions_requests=[
            images_partitions_def.build_add_request(new_images)
        ],
    )

The dynamic partitions example contains a full project that uses dynamic partitions.

Viewing asset partition status#

To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.

In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:

Materializing partitioned assets#

When you materialize a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition.

Note: If you choose more than one partition, the Dagster daemon needs to be running to queue the multiple runs.

After you've materialized a partition, it will show up as green in the partitions bar:

To view materializations by partition, navigate to the Activity tab:

Partition dependencies#

When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.

A few rules govern default partition-to-partition dependencies:

  • When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset depends on the same partition in the upstream asset.

  • When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.

    For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset the would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.

PartitionMappings#

Currently, partition mappings are only supported for managed-loading dependencies

You can override the default partition dependency rules by providing a PartitionMapping when specifying a dependency on an asset. For example, here's how to specify that each partition of a daily-partitioned asset depends on the prior day's partition in an upstream asset:

from dagster import (
    AssetIn,
    DailyPartitionsDefinition,
    TimeWindowPartitionMapping,
    asset,
)

partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")


@asset(partitions_def=partitions_def)
def events():
    ...


@asset(
    partitions_def=partitions_def,
    ins={
        "events": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-1, end_offset=-1
            ),
        )
    },
)
def yesterday_event_stats(events):
    ...

Refer to the API docs for a list of available PartitionMappings.


Partitioned asset jobs#

A partitioned asset job is a job that materializes a particular set of partitioned assets every time it runs.

from dagster import (
    AssetSelection,
    Definitions,
    HourlyPartitionsDefinition,
    asset,
    define_asset_job,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")


@asset(partitions_def=hourly_partitions_def)
def asset1():
    ...


@asset(partitions_def=hourly_partitions_def)
def asset2():
    ...


partitioned_asset_job = define_asset_job(
    name="asset_1_and_2_job",
    selection=AssetSelection.assets(asset1, asset2),
    partitions_def=hourly_partitions_def,
)


defs = Definitions(
    assets=[asset1, asset2],
    jobs=[partitioned_asset_job],
)

Partitioned non-asset jobs#

Relevant APIs#

NameDescription
PartitionedConfigDetermines a set of partitions and how to generate run config for a partition.
@daily_partitioned_configDecorator for constructing partitioned config where each partition is a date.
@hourly_partitioned_configDecorator for constructing partitioned config where each partition is an hour of a date.
@weekly_partitioned_configDecorator for constructing partitioned config where each partition is a week.
@monthly_partitioned_configDecorator for constructing partitioned config where each partition is a month.
@static_partitioned_configDecorator for constructing partitioned config for a static set of partition keys.
@dynamic_partitioned_configDecorator for constructing partitioned config for a set of partition keys that can grow over time.
build_schedule_from_partitioned_jobA function that constructs a schedule whose interval matches the partitioning of a partitioned job.

When defining a job that doesn't use software-defined assets, you can make it partitioned by supplying PartitionedConfig object as its config.

Defining a job with time partitions#

The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.

Non-partitioned job with date config#

Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:

from dagster import Config, job, op


class ProcessDateConfig(Config):
    date: str


@op
def process_data_for_date(context, config: ProcessDateConfig):
    date = config.date
    context.log.info(f"processing data for {date}")


@job
def do_stuff():
    process_data_for_date()

It takes, as config, a string date. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:

graph:
  process_data_for_date:
    config:
      date: "2020-05-05"

Date-partitioned job#

With the job above, it's possible to supply any value for the date param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.

First, you define the PartitionedConfig. In this case, because each partition is a date, you can use the @daily_partitioned_config decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.

from dagster import daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }

Then you can build a job that uses the PartitionedConfig by supplying it to the config argument when you construct the job:

@job(config=my_partitioned_config)
def do_stuff_partitioned():
    process_data_for_date()

Dagster UI partitions tab#

In the UI, you can view runs by partition in the Partitions tab of a Job page:

Partitions Tab

In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.

You can view and use partitions in the UI Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the 2020-01-02 partition, and we can see that the run config for the partition has been populated in the editor.

Partitions in the Dagster UI Launchpad

In addition to the @daily_partitioned_config decorator, Dagster also provides @monthly_partitioned_config, @weekly_partitioned_config, @hourly_partitioned_config. See the API docs for each of these decorators for more information on how partitions are built based on different start_date, minute_offset, hour_offset, and day_offset inputs.

Defining a job with static partitions#

Not all jobs are partitioned by time. Here's a partitioned job where the partitions are continents:

from dagster import Config, job, op, static_partitioned_config

CONTINENTS = [
    "Africa",
    "Antarctica",
    "Asia",
    "Europe",
    "North America",
    "Oceania",
    "South America",
]


@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
    return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}


class ContinentOpConfig(Config):
    continent_name: str


@op
def continent_op(context, config: ContinentOpConfig):
    context.log.info(config.continent_name)


@job(config=continent_config)
def continent_job():
    continent_op()

Creating schedules from partitioned jobs#

It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.

The build_schedule_from_partitioned_job function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned job defined above, you could write:

from dagster import build_schedule_from_partitioned_job, job


@job(config=my_partitioned_config)
def do_stuff_partitioned():
    ...


do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
    do_stuff_partitioned,
)

Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job above that runs each partition, you could write:

from dagster import schedule, RunRequest


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
    for c in CONTINENTS:
        yield RunRequest(run_key=c, partition_key=c)

Or a schedule that will run a subselection of the partition

@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
    return RunRequest(partition_key="Antarctica")

Refer to the Schedules documentation for more info about constructing both schedule types.


Testing#

Testing partitioned config#

Invoking a PartitionedConfig object will directly invoke the decorated function.

If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config function.

from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }


def test_my_partitioned_config():
    # assert that the decorated function returns the expected output
    run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
    assert run_config == {
        "ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
    }

    # assert that the output of the decorated function is valid configuration for the
    # do_stuff_partitioned job
    assert validate_run_config(do_stuff_partitioned, run_config)

If you want to test that your PartitionedConfig creates the partitions you expect, you can use the get_partition_keys or get_run_config_for_partition_key functions.

from dagster import Config


@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data": {
                "config": {
                    "start": start.strftime("%Y-%m-%d-%H:%M"),
                    "end": _end.strftime("%Y-%m-%d-%H:%M"),
                }
            }
        }
    }


class ProcessDataConfig(Config):
    start: str
    end: str


@op
def process_data(context, config: ProcessDataConfig):
    s = config.start
    e = config.end
    context.log.info(f"processing data for {s} - {e}")


@job(config=my_offset_partitioned_config)
def do_more_stuff_partitioned():
    process_data()


def test_my_offset_partitioned_config():
    # test that the partition keys are what you expect
    keys = my_offset_partitioned_config.get_partition_keys()
    assert keys[0] == "2020-01-01"
    assert keys[1] == "2020-01-02"

    # test that the run_config for a partition is valid for do_stuff_partitioned
    run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
    assert validate_run_config(do_more_stuff_partitioned, run_config)

    # test that the contents of run_config are what you expect
    assert run_config == {
        "ops": {
            "process_data": {
                "config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}
            }
        }
    }

Testing partitioned jobs#

To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key argument of JobDefinition.execute_in_process

def test_do_stuff_partitioned():
    assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success

See it in action#

For more examples of partitions, check out the following in our Hacker News example: