This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import hashlib
import textwrap
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)
from dagster import (
AssetKey,
AssetsDefinition,
AssetSelection,
AutoMaterializePolicy,
DagsterInvariantViolationError,
FreshnessPolicy,
In,
MetadataValue,
Nothing,
Out,
RunConfig,
ScheduleDefinition,
TableColumn,
TableSchema,
_check as check,
define_asset_job,
)
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import deprecation_warning
from .utils import input_name_fn, output_name_fn
if TYPE_CHECKING:
from .dagster_dbt_translator import DagsterDbtTranslator, DbtManifestWrapper
MANIFEST_METADATA_KEY = "dagster_dbt/manifest"
DAGSTER_DBT_TRANSLATOR_METADATA_KEY = "dagster_dbt/dagster_dbt_translator"
[docs]def get_asset_key_for_model(dbt_assets: Sequence[AssetsDefinition], model_name: str) -> AssetKey:
"""Return the corresponding Dagster asset key for a dbt model.
Args:
dbt_assets (AssetsDefinition): An AssetsDefinition object produced by
load_assets_from_dbt_project, load_assets_from_dbt_manifest, or @dbt_assets.
model_name (str): The name of the dbt model.
Returns:
AssetKey: The corresponding Dagster asset key.
Examples:
.. code-block:: python
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_model
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(deps={get_asset_key_for_model([all_dbt_assets], "customers")})
def cleaned_customers():
...
"""
check.sequence_param(dbt_assets, "dbt_assets", of_type=AssetsDefinition)
check.str_param(model_name, "model_name")
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets)
matching_models = [
value
for value in manifest["nodes"].values()
if value["name"] == model_name and value["resource_type"] == "model"
]
if len(matching_models) == 0:
raise KeyError(f"Could not find a dbt model with name: {model_name}")
return dagster_dbt_translator.get_asset_key(next(iter(matching_models)))
[docs]def get_asset_keys_by_output_name_for_source(
dbt_assets: Sequence[AssetsDefinition], source_name: str
) -> Mapping[str, AssetKey]:
"""Returns the corresponding Dagster asset keys for all tables in a dbt source.
This is a convenience method that makes it easy to define a multi-asset that generates
all the tables for a given dbt source.
Args:
source_name (str): The name of the dbt source.
Returns:
Mapping[str, AssetKey]: A mapping of the table name to corresponding Dagster asset key
for all tables in the given dbt source.
Examples:
.. code-block:: python
from dagster import AssetOut, multi_asset
from dagster_dbt import dbt_assets, get_asset_keys_by_output_name_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@multi_asset(
outs={
name: AssetOut(key=asset_key)
for name, asset_key in get_asset_keys_by_output_name_for_source(
[all_dbt_assets], "raw_data"
).items()
},
)
def upstream_python_asset():
...
"""
check.sequence_param(dbt_assets, "dbt_assets", of_type=AssetsDefinition)
check.str_param(source_name, "source_name")
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets)
matching_nodes = [
value for value in manifest["sources"].values() if value["source_name"] == source_name
]
if len(matching_nodes) == 0:
raise KeyError(f"Could not find a dbt source with name: {source_name}")
return {
output_name_fn(value): dagster_dbt_translator.get_asset_key(value)
for value in matching_nodes
}
[docs]def get_asset_key_for_source(dbt_assets: Sequence[AssetsDefinition], source_name: str) -> AssetKey:
"""Returns the corresponding Dagster asset key for a dbt source with a singular table.
Args:
source_name (str): The name of the dbt source.
Raises:
DagsterInvalidInvocationError: If the source has more than one table.
Returns:
AssetKey: The corresponding Dagster asset key.
Examples:
.. code-block:: python
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(key=get_asset_key_for_source([all_dbt_assets], "my_source"))
def upstream_python_asset():
...
"""
asset_keys_by_output_name = get_asset_keys_by_output_name_for_source(dbt_assets, source_name)
if len(asset_keys_by_output_name) > 1:
raise KeyError(
f"Source {source_name} has more than one table:"
f" {asset_keys_by_output_name.values()}. Use"
" `get_asset_keys_by_output_name_for_source` instead to get all tables for a"
" source."
)
return list(asset_keys_by_output_name.values())[0]
[docs]def build_dbt_asset_selection(
dbt_assets: Sequence[AssetsDefinition],
dbt_select: str = "fqn:*",
dbt_exclude: Optional[str] = None,
) -> AssetSelection:
"""Build an asset selection for a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for
more information.
Args:
dbt_select (str): A dbt selection string to specify a set of dbt resources.
dbt_exclude (Optional[str]): A dbt selection string to exclude a set of dbt resources.
Returns:
AssetSelection: An asset selection for the selected dbt nodes.
Examples:
.. code-block:: python
from dagster_dbt import dbt_assets, build_dbt_asset_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
# Select the dbt assets that have the tag "foo".
foo_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")
# Select the dbt assets that have the tag "foo" and all Dagster assets downstream
# of them (dbt-related or otherwise)
foo_and_downstream_selection = foo_selection.downstream()
"""
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets)
from .dbt_manifest_asset_selection import DbtManifestAssetSelection
return DbtManifestAssetSelection(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
select=dbt_select,
exclude=dbt_exclude,
)
[docs]def build_schedule_from_dbt_selection(
dbt_assets: Sequence[AssetsDefinition],
job_name: str,
cron_schedule: str,
dbt_select: str = "fqn:*",
dbt_exclude: Optional[str] = None,
tags: Optional[Mapping[str, str]] = None,
config: Optional[RunConfig] = None,
execution_timezone: Optional[str] = None,
) -> ScheduleDefinition:
"""Build a schedule to materialize a specified set of dbt resources from a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for
more information.
Args:
job_name (str): The name of the job to materialize the dbt resources.
cron_schedule (str): The cron schedule to define the schedule.
dbt_select (str): A dbt selection string to specify a set of dbt resources.
dbt_exclude (Optional[str]): A dbt selection string to exclude a set of dbt resources.
tags (Optional[Mapping[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the scheduled runs.
config (Optional[RunConfig]): The config that parameterizes the execution of this schedule.
execution_timezone (Optional[str]): Timezone in which the schedule should run.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
Returns:
ScheduleDefinition: A definition to materialize the selected dbt resources on a cron schedule.
Examples:
.. code-block:: python
from dagster_dbt import dbt_assets, build_schedule_from_dbt_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[all_dbt_assets],
job_name="all_dbt_assets",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
)
"""
return ScheduleDefinition(
cron_schedule=cron_schedule,
job=define_asset_job(
name=job_name,
selection=build_dbt_asset_selection(
dbt_assets,
dbt_select=dbt_select,
dbt_exclude=dbt_exclude,
),
config=config,
tags=tags,
),
execution_timezone=execution_timezone,
)
def get_manifest_and_translator_from_dbt_assets(
dbt_assets: Sequence[AssetsDefinition],
) -> Tuple[Mapping[str, Any], "DagsterDbtTranslator"]:
check.invariant(len(dbt_assets) == 1, "Exactly one dbt AssetsDefinition is required")
dbt_assets_def = dbt_assets[0]
metadata_by_key = dbt_assets_def.metadata_by_key or {}
first_asset_key = next(iter(dbt_assets_def.keys))
first_metadata = metadata_by_key.get(first_asset_key, {})
manifest_wrapper: Optional["DbtManifestWrapper"] = first_metadata.get(MANIFEST_METADATA_KEY)
if manifest_wrapper is None:
raise DagsterInvariantViolationError(
f"Expected to find dbt manifest metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by"
" load_assets_from_dbt_project, load_assets_from_dbt_manifest, or @dbt_assets?"
)
dagster_dbt_translator = first_metadata.get(DAGSTER_DBT_TRANSLATOR_METADATA_KEY)
if dagster_dbt_translator is None:
raise DagsterInvariantViolationError(
f"Expected to find dbt translator metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by"
" load_assets_from_dbt_project, load_assets_from_dbt_manifest, or @dbt_assets?"
)
return manifest_wrapper.manifest, dagster_dbt_translator
###################
# DEFAULT FUNCTIONS
###################
def default_asset_key_fn(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""Get the asset key for a dbt node.
By default, if the dbt node has a Dagster asset key configured in its metadata, then that is
parsed and used.
Otherwise:
dbt sources: a dbt source's key is the union of its source name and its table name
dbt models: a dbt model's key is the union of its model name and any schema configured on
the model itself.
"""
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
asset_key_config = dagster_metadata.get("asset_key", [])
if asset_key_config:
return AssetKey(asset_key_config)
if dbt_resource_props["resource_type"] == "source":
components = [dbt_resource_props["source_name"], dbt_resource_props["name"]]
else:
configured_schema = dbt_resource_props["config"].get("schema")
if configured_schema is not None:
components = [configured_schema, dbt_resource_props["name"]]
else:
components = [dbt_resource_props["name"]]
return AssetKey(components)
[docs]def default_metadata_from_dbt_resource_props(
dbt_resource_props: Mapping[str, Any]
) -> Mapping[str, Any]:
metadata: Dict[str, Any] = {}
columns = dbt_resource_props.get("columns", {})
if len(columns) > 0:
metadata["table_schema"] = MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
name=column_name,
type=column_info.get("data_type") or "?",
description=column_info.get("description"),
)
for column_name, column_info in columns.items()
]
)
)
return metadata
[docs]def default_group_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""Get the group name for a dbt node.
If a Dagster group is configured in the metadata for the node, use that.
Otherwise, if a dbt group is configured for the node, use that.
"""
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
dagster_group = dagster_metadata.get("group")
if dagster_group:
return dagster_group
dbt_group = dbt_resource_props.get("config", {}).get("group")
if dbt_group:
return dbt_group
return None
[docs]def group_from_dbt_resource_props_fallback_to_directory(
dbt_resource_props: Mapping[str, Any]
) -> Optional[str]:
"""Get the group name for a dbt node.
Has the same behavior as the default_group_from_dbt_resource_props, except for that, if no group can be determined
from config or metadata, falls back to using the subdirectory of the models directory that the
source file is in.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Examples:
.. code-block:: python
from dagster_dbt import group_from_dbt_resource_props_fallback_to_directory
dbt_assets = load_assets_from_dbt_manifest(
manifest=manifest,
node_info_to_group_fn=group_from_dbt_resource_props_fallback_to_directory,
)
"""
group_name = default_group_from_dbt_resource_props(dbt_resource_props)
if group_name is not None:
return group_name
fqn = dbt_resource_props.get("fqn", [])
# the first component is the package name, and the last component is the model name
if len(fqn) < 3:
return None
return fqn[1]
def default_freshness_policy_fn(dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
freshness_policy_config = dagster_metadata.get("freshness_policy", {})
freshness_policy = _legacy_freshness_policy_fn(freshness_policy_config)
if freshness_policy:
return freshness_policy
legacy_freshness_policy_config = dbt_resource_props["config"].get(
"dagster_freshness_policy", {}
)
legacy_freshness_policy = _legacy_freshness_policy_fn(legacy_freshness_policy_config)
if legacy_freshness_policy:
deprecation_warning(
"dagster_freshness_policy",
"0.21.0",
"Instead, configure a Dagster freshness policy on a dbt model using"
" +meta.dagster.freshness_policy.",
)
return legacy_freshness_policy
def _legacy_freshness_policy_fn(
freshness_policy_config: Mapping[str, Any]
) -> Optional[FreshnessPolicy]:
if freshness_policy_config:
return FreshnessPolicy(
maximum_lag_minutes=float(freshness_policy_config["maximum_lag_minutes"]),
cron_schedule=freshness_policy_config.get("cron_schedule"),
cron_schedule_timezone=freshness_policy_config.get("cron_schedule_timezone"),
)
return None
def default_auto_materialize_policy_fn(
dbt_resource_props: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
auto_materialize_policy_config = dagster_metadata.get("auto_materialize_policy", {})
auto_materialize_policy = _auto_materialize_policy_fn(auto_materialize_policy_config)
if auto_materialize_policy:
return auto_materialize_policy
legacy_auto_materialize_policy_config = dbt_resource_props["config"].get(
"dagster_auto_materialize_policy", {}
)
legacy_auto_materialize_policy = _auto_materialize_policy_fn(
legacy_auto_materialize_policy_config
)
if legacy_auto_materialize_policy:
deprecation_warning(
"dagster_auto_materialize_policy",
"0.21.0",
"Instead, configure a Dagster auto-materialize policy on a dbt model using"
" +meta.dagster.auto_materialize_policy.",
)
return legacy_auto_materialize_policy
def _auto_materialize_policy_fn(
auto_materialize_policy_config: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
if auto_materialize_policy_config.get("type") == "eager":
return AutoMaterializePolicy.eager()
elif auto_materialize_policy_config.get("type") == "lazy":
return AutoMaterializePolicy.lazy()
return None
def default_description_fn(dbt_resource_props: Mapping[str, Any], display_raw_sql: bool = True):
code_block = textwrap.indent(
dbt_resource_props.get("raw_sql") or dbt_resource_props.get("raw_code", ""), " "
)
description_sections = [
dbt_resource_props["description"]
or f"dbt {dbt_resource_props['resource_type']} {dbt_resource_props['name']}",
]
if display_raw_sql:
description_sections.append(f"#### Raw SQL:\n```\n{code_block}\n```")
return "\n\n".join(filter(None, description_sections))
def default_code_version_fn(dbt_resource_props: Mapping[str, Any]) -> str:
return hashlib.sha1(
(dbt_resource_props.get("raw_sql") or dbt_resource_props.get("raw_code", "")).encode(
"utf-8"
)
).hexdigest()
###################
# DEPENDENCIES
###################
def is_non_asset_node(dbt_resource_props: Mapping[str, Any]):
# some nodes exist inside the dbt graph but are not assets
resource_type = dbt_resource_props["resource_type"]
if resource_type == "metric":
return True
if (
resource_type == "model"
and dbt_resource_props.get("config", {}).get("materialized") == "ephemeral"
):
return True
return False
def get_deps(
dbt_nodes: Mapping[str, Any],
selected_unique_ids: AbstractSet[str],
asset_resource_types: List[str],
) -> Mapping[str, FrozenSet[str]]:
def _valid_parent_node(dbt_resource_props):
# sources are valid parents, but not assets
return dbt_resource_props["resource_type"] in asset_resource_types + ["source"]
asset_deps: Dict[str, Set[str]] = {}
for unique_id in selected_unique_ids:
dbt_resource_props = dbt_nodes[unique_id]
node_resource_type = dbt_resource_props["resource_type"]
# skip non-assets, such as metrics, tests, and ephemeral models
if is_non_asset_node(dbt_resource_props) or node_resource_type not in asset_resource_types:
continue
asset_deps[unique_id] = set()
for parent_unique_id in dbt_resource_props.get("depends_on", {}).get("nodes", []):
parent_node_info = dbt_nodes[parent_unique_id]
# for metrics or ephemeral dbt models, BFS to find valid parents
if is_non_asset_node(parent_node_info):
visited = set()
replaced_parent_ids = set()
# make a copy to avoid mutating the actual dictionary
queue = list(parent_node_info.get("depends_on", {}).get("nodes", []))
while queue:
candidate_parent_id = queue.pop()
if candidate_parent_id in visited:
continue
visited.add(candidate_parent_id)
candidate_parent_info = dbt_nodes[candidate_parent_id]
if is_non_asset_node(candidate_parent_info):
queue.extend(candidate_parent_info.get("depends_on", {}).get("nodes", []))
elif _valid_parent_node(candidate_parent_info):
replaced_parent_ids.add(candidate_parent_id)
asset_deps[unique_id] |= replaced_parent_ids
# ignore nodes which are not assets / sources
elif _valid_parent_node(parent_node_info):
asset_deps[unique_id].add(parent_unique_id)
frozen_asset_deps = {
unique_id: frozenset(parent_ids) for unique_id, parent_ids in asset_deps.items()
}
return frozen_asset_deps
def get_asset_deps(
dbt_nodes,
deps,
node_info_to_freshness_policy_fn,
node_info_to_auto_materialize_policy_fn,
io_manager_key,
manifest: Optional[Mapping[str, Any]],
dagster_dbt_translator: "DagsterDbtTranslator",
) -> Tuple[
Dict[AssetKey, Set[AssetKey]],
Dict[AssetKey, Tuple[str, In]],
Dict[AssetKey, Tuple[str, Out]],
Dict[AssetKey, str],
Dict[AssetKey, FreshnessPolicy],
Dict[AssetKey, AutoMaterializePolicy],
Dict[str, List[str]],
Dict[str, Dict[str, Any]],
]:
from .dagster_dbt_translator import DbtManifestWrapper
asset_deps: Dict[AssetKey, Set[AssetKey]] = {}
asset_ins: Dict[AssetKey, Tuple[str, In]] = {}
asset_outs: Dict[AssetKey, Tuple[str, Out]] = {}
# These dicts could be refactored as a single dict, mapping from output name to arbitrary
# metadata that we need to store for reference.
group_names_by_key: Dict[AssetKey, str] = {}
freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {}
auto_materialize_policies_by_key: Dict[AssetKey, AutoMaterializePolicy] = {}
fqns_by_output_name: Dict[str, List[str]] = {}
metadata_by_output_name: Dict[str, Dict[str, Any]] = {}
for unique_id, parent_unique_ids in deps.items():
dbt_resource_props = dbt_nodes[unique_id]
output_name = output_name_fn(dbt_resource_props)
fqns_by_output_name[output_name] = dbt_resource_props["fqn"]
metadata_by_output_name[output_name] = {
key: dbt_resource_props[key] for key in ["unique_id", "resource_type"]
}
asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props)
asset_deps[asset_key] = set()
metadata = merge_dicts(
dagster_dbt_translator.get_metadata(dbt_resource_props),
{
MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest) if manifest else None,
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
},
)
asset_outs[asset_key] = (
output_name,
Out(
io_manager_key=io_manager_key,
description=dagster_dbt_translator.get_description(dbt_resource_props),
metadata=metadata,
is_required=False,
dagster_type=Nothing,
code_version=default_code_version_fn(dbt_resource_props),
),
)
group_name = dagster_dbt_translator.get_group_name(dbt_resource_props)
if group_name is not None:
group_names_by_key[asset_key] = group_name
freshness_policy = node_info_to_freshness_policy_fn(dbt_resource_props)
if freshness_policy is not None:
freshness_policies_by_key[asset_key] = freshness_policy
auto_materialize_policy = node_info_to_auto_materialize_policy_fn(dbt_resource_props)
if auto_materialize_policy is not None:
auto_materialize_policies_by_key[asset_key] = auto_materialize_policy
for parent_unique_id in parent_unique_ids:
parent_node_info = dbt_nodes[parent_unique_id]
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_node_info)
asset_deps[asset_key].add(parent_asset_key)
# if this parent is not one of the selected nodes, it's an input
if parent_unique_id not in deps:
input_name = input_name_fn(parent_node_info)
asset_ins[parent_asset_key] = (input_name, In(Nothing))
return (
asset_deps,
asset_ins,
asset_outs,
group_names_by_key,
freshness_policies_by_key,
auto_materialize_policies_by_key,
fqns_by_output_name,
metadata_by_output_name,
)