You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

dagster-dbt integration reference#

Using dbt Cloud? Check out the dbt Cloud with Dagster guide!

This reference provides a high-level look at working with dbt models through Dagster's software-defined assets framework using the dagster-dbt integration library.

For a step-by-step implementation walkthrough, refer to the Using dbt with Dagster software-defined assets tutorial.


dbt models and Dagster software-defined assets#

Dagster’s software-defined assets (SDAs) bear several similarities to dbt models. A software-defined asset contains an asset key, a set of upstream asset keys, and an operation that is responsible for computing the asset from its upstream dependencies. Models defined in a dbt project can be interpreted as Dagster SDAs:

  • The asset key for a dbt model is (by default) the name of the model.
  • The upstream dependencies of a dbt model are defined with ref or source calls within the model's definition.
  • The computation required to compute the asset from its upstream dependencies is the SQL within the model's definition.

These similarities make it natural to interact with dbt models as SDAs. Let’s take a look at a dbt model and an SDA, in code:

Comparison of a dbt model and Dagster asset in code

Here's what's happening in this example:

  • The first code block is a dbt model
    • As dbt models are named using file names, this model is named orders
    • The data for this model comes from a dependency named raw_orders
  • The second code block is a Dagster asset
    • The asset key corresponds to the name of the dbt model, orders
    • raw_orders is provided as an argument to the asset, defining it as a dependency

Loading dbt models from a dbt project#

The dagster-dbt library offers two methods of loading dbt models from a project into Dagster:

Loading models using load_assets_from_dbt_project#

Check out part two of the dbt + Dagster tutorial to see this concept in context.

For smaller dbt projects where compilation time is not a concern, the simplest way to load your dbt assets into Dagster is the following:

from dagster_dbt import load_assets_from_dbt_project

dbt_assets = load_assets_from_dbt_project(project_dir="path/to/dbt/project")

The load_assets_from_dbt_project function:

  1. Compiles your dbt project,
  2. Parses the metadata that dbt provides, and
  3. Generates a set of software-defined assets that reflect the models in the project. These assets will share the same underlying operation, which will invoke dbt to run the models represented by the loaded assets.

Loading models using load_assets_from_dbt_manifest#

For larger dbt projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json file using the load_assets_from_dbt_manifest function:

import json

from dagster_dbt import load_assets_from_dbt_manifest

with open("path/to/dbt/manifest.json") as f:
    manifest_json = json.load(f)

dbt_assets = load_assets_from_dbt_manifest(manifest_json)

If you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.


Using the DbtCliResource for dbt CLI commands#

Check out part three of the dbt + Dagster tutorial to see this concept in context.

Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. The dagster-dbt integration provides the DbtCliResource for this purpose. This resource can be configured with CLI flags that are passed into every dbt invocation.

The most important flag to set is the project_dir flag, which points Dagster at the directory of your dbt project. You can also use the target flag to point at an explicit dbt target. For a full list of configuration options, refer to the DbtCliResource API docs.

You can configure this resource and add it to your dbt assets by doing the following:

import os

from dagster_dbt import DbtCliResource, load_assets_from_dbt_project

from dagster import Definitions

DBT_PROJECT_PATH = "path/to/dbt_project"
DBT_TARGET = "hive" if os.getenv("EXECUTION_ENV") == "prod" else "duckdb"

defs = Definitions(
    assets=load_assets_from_dbt_project(DBT_PROJECT_PATH),
    resources={
        "dbt": DbtCliResource(project_dir=DBT_PROJECT_PATH, target=DBT_TARGET),
    },
)

Scheduling dbt jobs#

Once you have your dbt assets, you can define a job to materialize a selection of these assets on a schedule.

Scheduling jobs that contain only dbt assets#

In this example, we use the build_schedule_from_dbt_selection function to create a job, daily_dbt_models, as well as a schedule which will execute this job once a day. We define the set of models we'd like to execute using dbt's selection syntax, in this case selecting only the models with the tag daily.

from dagster_dbt import build_schedule_from_dbt_selection, dbt_assets

@dbt_assets(manifest=manifest)
def my_dbt_assets():
    ...

daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
    [my_dbt_assets],
    job_name="daily_dbt_models",
    cron_schedule="@daily",
    dbt_select="tag:daily",
)

Scheduling jobs that contain dbt assets and non-dbt assets#

In many cases, it's useful to be able to schedule dbt assets alongside non-dbt assets. In this example, we build an AssetSelection of dbt assets using build_dbt_asset_selection, then select all assets (dbt-related or not) which are downstream of these dbt models. From there, we create a job that targets that selection of assets and schedule it to run daily.

from dagster import define_asset_job, ScheduleDefinition
from dagster_dbt import build_dbt_asset_selection, dbt_assets

@dbt_assets(manifest=manifest)
def my_dbt_assets():
    ...

# selects all models tagged with "daily", and all their downstream asset dependencies
daily_selection = build_dbt_asset_selection(
    [my_dbt_assets], dbt_select="tag:daily"
).downstream()

daily_dbt_assets_and_downstream_schedule = ScheduleDefinition(
    job=define_asset_job("daily_assets", selection=daily_selection),
    cron_schedule="@daily",
)

Refer to the Schedule documentation for more info on running jobs on a schedule.


Understanding asset definition attributes#

In Dagster, each asset definition has attributes. Dagster automatically generates these attributes for each software-defined asset loaded from the dbt project. These attributes can optionally be overridden by the user.

Customizing asset keys#

For dbt models, seeds, and snapshots, the default asset key will be the configured schema for that node, concatenated with the name of the node.

dbt node typeSchemaModel nameResulting asset key
model, seed, snapshotNoneMODEL_NAMEMODEL_NAME
SCHEMAMODEL_NAMESCHEMA/MODEL_NAME
Nonemy_modelsome_model
marketingmy_modelmarketing/my_model

For dbt sources, the default asset key will be the name of the source concatenated with the name of the source table.

dbt node typeSource nameTable nameResulting asset key
sourceSOURCE_NAMETABLE_NAMESOURCE_NAME/TABLE_NAME
jaffle_shopordersjaffle_shop/orders

There are two ways to customize the asset keys generated by Dagster for dbt assets:

  1. Defining meta config on your dbt node, or
  2. Overriding Dagster's asset key generation by implementing a custom DagsterDbtTranslator.

To override an asset key generated by Dagster for a dbt node, you can define a meta key on your dbt node's .yml file. The following example overrides the asset key for a source and table as snowflake/jaffle_shop/orders:

sources:
  - name: jaffle_shop
    tables:
      - name: orders
        meta:
          dagster:
            asset_key: ["snowflake", "jaffle_shop", "orders"]

Alternatively, to override the asset key generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_asset_key. The following example adds a snowflake prefix to the default generated asset key:

from pathlib import Path
from dagster import AssetKey, OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        return super().get_asset_key(dbt_resource_props).with_prefix("snowflake")

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Customizing group names#

For dbt models, seeds, and snapshots, the default Dagster group name will be the dbt group defined for that node.

dbt node typedbt group nameResulting Dagster group name
model, seed, snapshotGROUP_NAMEGROUP_NAME
NoneNone
financefinance

There are two ways to customize the asset keys generated by Dagster for dbt assets:

  1. Defining meta config on your dbt node, or
  2. Overriding Dagster's group name generation by implementing a custom DagsterDbtTranslator

To override the group name generated by Dagster for a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. The following example overrides the Dagster group name for the following model as marketing:

models:
  - name: customers
    config:
      meta:
        dagster:
          group: marketing

Alternatively, to override the Dagster group name generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_group_name. The following example defines snowflake as the group name for all dbt nodes:

from pathlib import Path
from dagster import OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_group_name(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Optional[str]:
        return "snowflake"

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Customizing descriptions#

For dbt models, seeds, and snapshots, the default Dagster description will be the dbt node's description.

To override the Dagster description for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_description. The following example defines the raw SQL of the dbt node as the Dagster description:

import textwrap
from pathlib import Path
from dagster import OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
        return textwrap.indent(dbt_resource_props.get("raw_sql", ""), "\t")

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Customizing metadata#

For dbt models, seeds, and snapshots, the default Dagster metadata will be the dbt node's table schema.

To override the Dagster metadata for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_metadata. The following example defines the metadata of the dbt node as the Dagster metadata, using MetadataValue:

from pathlib import Path
from dagster import MetadataValue, OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_metadata(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Mapping[str, Any]:
        return {
            "dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta", {}))
        }

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Customizing auto-materialize policies#

For dbt models, seeds, and snapshots, the default AutoMaterializePolicy will be None.

There are two ways to customize the auto-materialize policies generated by Dagster for dbt assets:

  1. Defining meta config on your dbt node, or
  2. Overriding Dagster's auto-materialize policy generation by implementing a custom DagsterDbtTranslator.

To add an AutoMaterializePolicy to a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This policy may be one of two types, eager or lazy. The following example provides an eager AutoMaterializePolicy for the following model:

models:
  - name: customers
    config:
      meta:
        dagster:
          auto_materialize_policy:
            type: eager

Alternatively, to override the Dagster auto-materialize policy generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_auto_materialize_policy. The following example defines AutoMaterializePolicy.eager as the auto-materialize policy for all dbt nodes:

from pathlib import Path
from dagster import OpExecutionContext, AutoMaterializePolicy
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_auto_materialize_policy(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Optional[AutoMaterializePolicy]:
        return AutoMaterializePolicy.eager()

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Customizing freshness policies#

For dbt models, seeds, and snapshots, the default FreshnessPolicy will be None.

There are two ways to customize the freshness policies generated by Dagster for dbt assets:

  1. Defining meta config on your dbt node, or
  2. Overriding Dagster's freshness policy generation by implementing a custom DagsterDbtTranslator.

To add a FreshnessPolicy to a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This config accepts identical arguments to the FreshnessPolicy class. The following example applies a FreshnessPolicy for the following model:

models:
  - name: customers
    config:
      meta:
        dagster:
          freshness_policy:
            maximum_lag_minutes: 10
            cron_schedule: 0 * * * *
            cron_schedule_timezone: US/Pacific

Alternatively, to override the Dagster freshness policy generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_freshness_policy. The following example defines a FreshnessPolicy with maximum_lag_minutes=60 as the freshness policy for all dbt nodes:

from pathlib import Path
from dagster import OpExecutionContext, FreshnessPolicy
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_freshness_policy(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Optional[FreshnessPolicy]:
        return FreshnessPolicy(maximum_lag_minutes=60)

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

dbt models, code versions, and staleness#

Note that Dagster allows the optional specification of a code_version for each software-defined asset, which are used to track changes. The code_version for an asset arising from a dbt model is defined automatically as the hash of the SQL defining the DBT model. This allows the asset graph in the UI to indicate which dbt models have new SQL since they were last materialized.


Defining dependencies#

Upstream dependencies#

Defining an asset as an upstream dependency of a dbt model#

Dagster allows you to define existing assets as upstream dependencies of dbt models. For example, say you have the following asset with asset key upstream:

from dagster import asset

@asset
def upstream():
    ...

In order to define this asset as an upstream dependency for a dbt model, you'll need to first declare it as a data source in the sources.yml file. Here, you can explicitly provide your asset key to a source table:

sources:
  - name: dagster
    tables:
      - name: upstream
        meta:
          dagster:
            asset_key: ["upstream"]

Then, in the downstream model, you can select from this source data. This defines a dependency relationship between your upstream asset and dbt model:

select *
  from {{ source("dagster", "upstream") }}
 where foo=1

Defining a dbt source as a Dagster asset#

Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt source, that source will be parsed as an upstream asset.

For example, if you defined a source in your sources.yml file like this:

sources:
  - name: jaffle_shop
    tables:
      - name: orders

and use it in a model:

select *
  from {{ source("jaffle_shop", "orders") }}
 where foo=1

Then this model has an upstream source with the jaffle_shop/orders asset key.

In order to manage this upstream asset with Dagster, you can define it by passing the key into an asset definition via get_asset_key_for_source:

from dagster import asset, OpExecutionContext
from dagster_dbt import DbtCliResource, get_asset_key_for_source, dbt_assets

@dbt_assets(manifest=MANIFEST_PATH)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    ...

@asset(key=get_asset_key_for_source([my_dbt_assets], "jaffle_shop"))
def orders():
    return ...

This allows you to change asset keys within your dbt project without having to update the corresponding Dagster definitions.

The get_asset_key_for_source method is used when a source has only one table. However, if a source contains multiple tables, like this example:

sources:
  - name: clients_data
    tables:
      - name: names
      - name: history

You can use define a @multi_asset with keys from get_asset_keys_by_output_name_for_source instead:

from dagster import multi_asset, AssetOut, Output
from dagster_dbt import get_asset_keys_by_output_name_for_source

@multi_asset(
    outs={
        name: AssetOut(key=asset_key)
        for name, asset_key in get_asset_keys_by_output_name_for_source(
            [my_dbt_assets], "jaffle_shop"
        ).items()
    }
)
def jaffle_shop(context):
    output_names = list(context.selected_output_names)
    yield Output(value=..., output_name=output_names[0])
    yield Output(value=..., output_name=output_names[1])

Downstream dependencies#

Dagster allows you to define assets that are downstream of specific dbt models via get_asset_key_for_model. The below example defines my_downstream_asset as a downstream dependency of my_dbt_model:

from dagster_dbt import get_asset_key_for_model
from dagster import asset

@asset(deps=[get_asset_key_for_model([my_dbt_assets], "my_dbt_model")])
def my_downstream_asset():
    ...

In the downstream asset, you may want direct access to the contents of the dbt model. To do so, you can customize the code within your @asset-decorated function to load upstream data.

Dagster alternatively allows you to delegate loading data to an I/O manager. For example, if you wanted to consume a dbt model with the asset key my_dbt_model as a Pandas dataframe, that would look something like the following:

from dagster_dbt import get_asset_key_for_model
from dagster import AssetIn, asset

@asset(
    ins={
        "my_dbt_model": AssetIn(
            input_manager_key="pandas_df_manager",
            key=get_asset_key_for_model([my_dbt_assets], "my_dbt_model"),
        )
    },
)
def my_downstream_asset(my_dbt_model):
    # my_dbt_model is a Pandas dataframe
    return my_dbt_model.where(foo="bar")