You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_dbt.core.resources_v2

import os
import shutil
import subprocess
import sys
import uuid
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Union,
)

import dateutil.parser
import orjson
from dagster import (
    AssetObservation,
    AssetsDefinition,
    ConfigurableResource,
    OpExecutionContext,
    Output,
    _check as check,
    get_dagster_logger,
)
from dagster._annotations import public
from dagster._core.errors import DagsterInvalidPropertyError
from dbt.contracts.results import NodeStatus
from dbt.node_types import NodeType
from pydantic import Field
from typing_extensions import Literal

from ..asset_utils import get_manifest_and_translator_from_dbt_assets, output_name_fn
from ..dagster_dbt_translator import DagsterDbtTranslator
from ..errors import DagsterDbtCliRuntimeError
from ..utils import ASSET_RESOURCE_TYPES, get_dbt_resource_props_by_dbt_unique_id_from_manifest

logger = get_dagster_logger()


PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"


[docs]@dataclass class DbtCliEventMessage: """The representation of a dbt CLI event. Args: raw_event (Dict[str, Any]): The raw event dictionary. See https://docs.getdbt.com/reference/events-logging#structured-logging for more information. """ raw_event: Dict[str, Any] @classmethod def from_log(cls, log: str) -> "DbtCliEventMessage": """Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging. We assume that the log format is json. """ raw_event: Dict[str, Any] = orjson.loads(log) return cls(raw_event=raw_event) def __str__(self) -> str: return self.raw_event["info"]["msg"]
[docs] @public def to_default_asset_events( self, manifest: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(), ) -> Iterator[Union[Output, AssetObservation]]: """Convert a dbt CLI event to a set of corresponding Dagster events. Args: manifest (Mapping[str, Any]): The dbt manifest blob. dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for linking dbt nodes to Dagster assets. Returns: Iterator[Union[Output, AssetObservation]]: A set of corresponding Dagster events. - AssetMaterializations for refables (e.g. models, seeds, snapshots.) - AssetObservations for test results. """ event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info") if not event_node_info: return unique_id: str = event_node_info["unique_id"] node_resource_type: str = event_node_info["resource_type"] node_status: str = event_node_info["node_status"] is_node_successful = node_status == NodeStatus.Success is_node_finished = bool(event_node_info.get("node_finished_at")) if node_resource_type in NodeType.refable() and is_node_successful: started_at = dateutil.parser.isoparse(event_node_info["node_started_at"]) finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"]) duration_seconds = (finished_at - started_at).total_seconds() yield Output( value=None, output_name=output_name_fn(event_node_info), metadata={ "unique_id": unique_id, "Execution Duration": duration_seconds, }, ) elif node_resource_type == NodeType.Test and is_node_finished: upstream_unique_ids: List[str] = manifest["parent_map"][unique_id] for upstream_unique_id in upstream_unique_ids: upstream_node_info: Dict[str, Any] = manifest["nodes"].get( upstream_unique_id ) or manifest["sources"].get(upstream_unique_id) upstream_asset_key = dagster_dbt_translator.get_asset_key(upstream_node_info) yield AssetObservation( asset_key=upstream_asset_key, metadata={ "unique_id": unique_id, "status": node_status, }, )
[docs]@dataclass class DbtCliInvocation: """The representation of an invoked dbt command. Args: process (subprocess.Popen): The process running the dbt command. manifest (Mapping[str, Any]): The dbt manifest blob. project_dir (Path): The path to the dbt project. target_path (Path): The path to the dbt target folder. raise_on_error (bool): Whether to raise an exception if the dbt command fails. """ process: subprocess.Popen manifest: Mapping[str, Any] dagster_dbt_translator: DagsterDbtTranslator project_dir: Path target_path: Path raise_on_error: bool @classmethod def run( cls, args: List[str], env: Dict[str, str], manifest: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator, project_dir: Path, target_path: Path, raise_on_error: bool, ) -> "DbtCliInvocation": # Attempt to take advantage of partial parsing. If there is a `partial_parse.msgpack` in # in the target folder, then copy it to the dynamic target path. # # This effectively allows us to skip the parsing of the manifest, which can be expensive. # See https://docs.getdbt.com/reference/programmatic-invocations#reusing-objects for more # details. partial_parse_file_path = project_dir.joinpath("target", PARTIAL_PARSE_FILE_NAME) partial_parse_destination_target_path = target_path.joinpath(PARTIAL_PARSE_FILE_NAME) if partial_parse_file_path.exists(): logger.info( f"Copying `{partial_parse_file_path}` to `{partial_parse_destination_target_path}`" " to take advantage of partial parsing." ) partial_parse_destination_target_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(partial_parse_file_path, partial_parse_destination_target_path) # Create a subprocess that runs the dbt CLI command. logger.info(f"Running dbt command: `{' '.join(args)}`.") process = subprocess.Popen( args=args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, cwd=project_dir, ) return cls( process=process, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=target_path, raise_on_error=raise_on_error, )
[docs] @public def wait(self) -> "DbtCliInvocation": """Wait for the dbt CLI process to complete. Returns: DbtCliInvocation: The current representation of the dbt CLI invocation. Examples: .. code-block:: python import json from pathlib import Path from dagster_dbt import DbtCliResource manifest = json.loads(Path("path/to/manifest.json").read_text()) dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"], manifest=manifest).wait() """ list(self.stream_raw_events()) return self
[docs] @public def is_successful(self) -> bool: """Return whether the dbt CLI process completed successfully. Returns: bool: True, if the dbt CLI process returns with a zero exit code, and False otherwise. Examples: .. code-block:: python import json from pathlib import Path from dagster_dbt import DbtCliResource manifest = json.loads(Path("path/to/manifest.json").read_text()) dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"], manifest=manifest, raise_on_error=False) if dbt_cli_invocation.is_successful(): ... """ return self.process.wait() == 0
[docs] @public def stream(self) -> Iterator[Union[Output, AssetObservation]]: """Stream the events from the dbt CLI process and convert them to Dagster events. Returns: Iterator[Union[Output, AssetObservation]]: A set of corresponding Dagster events. - Output for refables (e.g. models, seeds, snapshots.) - AssetObservations for test results. Examples: .. code-block:: python from pathlib import Path from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() """ for event in self.stream_raw_events(): yield from event.to_default_asset_events( manifest=self.manifest, dagster_dbt_translator=self.dagster_dbt_translator )
[docs] @public def stream_raw_events(self) -> Iterator[DbtCliEventMessage]: """Stream the events from the dbt CLI process. Returns: Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process. """ for raw_line in self.process.stdout or []: log: str = raw_line.decode().strip() try: event = DbtCliEventMessage.from_log(log=log) # Re-emit the logs from dbt CLI process into stdout. sys.stdout.write(str(event) + "\n") sys.stdout.flush() yield event except: # If we can't parse the log, then just emit it as a raw log. sys.stdout.write(log + "\n") sys.stdout.flush() # Ensure that the dbt CLI process has completed. self._raise_on_error()
[docs] @public def get_artifact( self, artifact: Union[ Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"], ], ) -> Dict[str, Any]: """Retrieve a dbt artifact from the target path. See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information. Args: artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]): The name of the artifact to retrieve. Returns: Dict[str, Any]: The artifact as a dictionary. Examples: .. code-block:: python import json from pathlib import Path from dagster_dbt import DbtCliResource manifest = json.loads(Path("path/to/manifest.json").read_text()) dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"], manifest=manifest).wait() # Retrieve the run_results.json artifact. run_results = dbt_cli_invocation.get_artifact("run_results.json") """ artifact_path = self.target_path.joinpath(artifact) return orjson.loads(artifact_path.read_bytes())
def _raise_on_error(self) -> None: """Ensure that the dbt CLI process has completed. If the process has not successfully completed, then optionally raise an error. """ if not self.is_successful() and self.raise_on_error: raise DagsterDbtCliRuntimeError( description=( f"The dbt CLI process failed with exit code {self.process.returncode}. Check" " the compute logs for the full information about the error." ) )
[docs]class DbtCliResource(ConfigurableResource): """A resource used to execute dbt CLI commands. Attributes: project_dir (str): The path to the dbt project directory. This directory should contain a `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more information. global_config_flags (List[str]): A list of global flags configuration to pass to the dbt CLI invocation. See https://docs.getdbt.com/reference/global-configs for a full list of configuration. profiles_dir (Optional[str]): The path to the directory containing your dbt `profiles.yml`. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. profile (Optional[str]): The profile from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. target (Optional[str]): The target from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. Examples: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", global_config_flags=["--no-use-colors"], profile="jaffle_shop", target="dev", ) """ project_dir: str = Field( ..., description=( "The path to your dbt project directory. This directory should contain a" " `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more" " information." ), ) global_config_flags: List[str] = Field( default=[], description=( "A list of global flags configuration to pass to the dbt CLI invocation. See" " https://docs.getdbt.com/reference/global-configs for a full list of configuration." ), ) profiles_dir: Optional[str] = Field( default=None, description=( "The path to the directory containing your dbt `profiles.yml`. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) profile: Optional[str] = Field( default=None, description=( "The profile from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) target: Optional[str] = Field( default=None, description=( "The target from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> str: """Get a unique target path for the dbt CLI invocation. Args: context (Optional[OpExecutionContext]): The execution context. Returns: str: A unique target path for the dbt CLI invocation. """ unique_id = str(uuid.uuid4())[:7] path = unique_id if context: path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}" return f"target/{path}"
[docs] @public def cli( self, args: List[str], *, raise_on_error: bool = True, manifest: Optional[Mapping[str, Any]] = None, dagster_dbt_translator: Optional[DagsterDbtTranslator] = None, context: Optional[OpExecutionContext] = None, ) -> DbtCliInvocation: """Execute a dbt command. Args: args (List[str]): The dbt CLI command to execute. raise_on_error (bool): Whether to raise an exception if the dbt CLI command fails. manifest (Optional[Mapping[str, Any]]): The dbt manifest blob. If an execution context from within `@dbt_assets` is provided to the context argument, then the manifest provided to `@dbt_assets` will be used. dagster_dbt_translator (Optional[DagsterDbtTranslator]): The translator to link dbt nodes to Dagster assets. If an execution context from within `@dbt_assets` is provided to the context argument, then the dagster_dbt_translator provided to `@dbt_assets` will be used. context (Optional[OpExecutionContext]): The execution context from within `@dbt_assets`. Returns: DbtCliInvocation: A invocation instance that can be used to retrieve the output of the dbt CLI command. Examples: .. code-block:: python from pathlib import Path from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() """ target_path = self._get_unique_target_path(context=context) env = { **os.environ.copy(), # Run dbt with unbuffered output. "PYTHONUNBUFFERED": "1", # The DBT_LOG_FORMAT environment variable must be set to `json`. We use this # environment variable to ensure that the dbt CLI outputs structured logs. "DBT_LOG_FORMAT": "json", # The DBT_TARGET_PATH environment variable is set to a unique value for each dbt # invocation so that artifact paths are separated. # See https://discourse.getdbt.com/t/multiple-run-results-json-and-manifest-json-files/7555 # for more information. "DBT_TARGET_PATH": target_path, # The DBT_PROFILES_DIR environment variable is set to the path containing the dbt # profiles.yml file. # See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory # for more information. **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), } assets_def: Optional[AssetsDefinition] = None with suppress(DagsterInvalidPropertyError): assets_def = context.assets_def if context else None if context and assets_def is not None: manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets( [assets_def] ) selection_args = get_subset_selection_for_context( context=context, manifest=manifest, select=context.op.tags.get("dagster-dbt/select"), exclude=context.op.tags.get("dagster-dbt/exclude"), ) else: selection_args: List[str] = [] if manifest is None: check.failed( "Must provide a value for the manifest argument if not executing as part of" " @dbt_assets" ) dagster_dbt_translator = dagster_dbt_translator or DagsterDbtTranslator() # TODO: verify that args does not have any selection flags if the context and manifest # are passed to this function. profile_args: List[str] = [] if self.profile: profile_args = ["--profile", self.profile] if self.target: profile_args += ["--target", self.target] args = ["dbt"] + self.global_config_flags + args + profile_args + selection_args project_dir = Path(self.project_dir).resolve(strict=True) return DbtCliInvocation.run( args=args, env=env, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=project_dir.joinpath(target_path), raise_on_error=raise_on_error, )
def get_subset_selection_for_context( context: OpExecutionContext, manifest: Mapping[str, Any], select: Optional[str], exclude: Optional[str], ) -> List[str]: """Generate a dbt selection string to materialize the selected resources in a subsetted execution context. See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work. Args: context (OpExecutionContext): The execution context for the current execution step. select (Optional[str]): A dbt selection string to select resources to materialize. exclude (Optional[str]): A dbt selection string to exclude resources from materializing. Returns: List[str]: dbt CLI arguments to materialize the selected resources in a subsetted execution context. If the current execution context is not performing a subsetted execution, return CLI arguments composed of the inputed selection and exclusion arguments. """ default_dbt_selection = [] if select: default_dbt_selection += ["--select", select] if exclude: default_dbt_selection += ["--exclude", exclude] dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest) # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. is_subsetted_execution = len(context.selected_output_names) != len( context.assets_def.node_keys_by_output_name ) if not is_subsetted_execution: logger.info( "A dbt subsetted execution is not being performed. Using the default dbt selection" f" arguments `{default_dbt_selection}`." ) return default_dbt_selection selected_dbt_resources = [] for output_name in context.selected_output_names: dbt_resource_props = dbt_resource_props_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method fqn_selector = f"fqn:{'.'.join(dbt_resource_props['fqn'])}" selected_dbt_resources.append(fqn_selector) # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] logger.info( "A dbt subsetted execution is being performed. Overriding default dbt selection" f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`" ) return union_selected_dbt_resources def get_dbt_resource_props_by_output_name( manifest: Mapping[str, Any] ) -> Mapping[str, Mapping[str, Any]]: node_info_by_dbt_unique_id = get_dbt_resource_props_by_dbt_unique_id_from_manifest(manifest) return { output_name_fn(node): node for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES }