You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_dbt.ops

from typing import Any, Dict, List, Optional

from dagster import Config, In, Nothing, Out, Output, op
from pydantic import Field

from .types import DbtOutput
from .utils import generate_events, generate_materializations

_DEFAULT_OP_PROPS: Dict[str, Any] = dict(
    required_resource_keys={"dbt"},
    ins={"start_after": In(Nothing)},
    out=Out(DbtOutput, description="Parsed output from running the dbt command."),
    tags={"kind": "dbt"},
)


def _get_doc(op_name: str, dbt_command: str) -> str:
    return f"""
This op executes a ``dbt {dbt_command}`` command. It requires the use of a dbt resource, which can be
set to execute this command through the CLI (using the :py:class:`~dagster_dbt.dbt_cli_resource`).

Examples:

.. code-block:: python

    from dagster import job
    from dagster_dbt import {op_name}, dbt_cli_resource

    @job(resource_defs={{"dbt":dbt_cli_resource}})
    def my_dbt_cli_job():
        {op_name}()
    """


# NOTE: mypy fails to properly track the type of `_DEFAULT_OP_PROPS` items when they are
# double-splatted, so we type-ignore the below op declarations.


class DbtBuildOpConfig(Config):
    yield_asset_events: bool = Field(
        default=True,
        description=(
            "If True, materializations and asset observations corresponding to the results of "
            "the dbt operation will be yielded when the op executes. Default: True"
        ),
    )
    asset_key_prefix: List[str] = Field(
        default=["dbt"],
        description=(
            "If provided and yield_materializations is True, these components will be used to "
            "prefix the generated asset keys."
        ),
    )


@op(**_DEFAULT_OP_PROPS)
def dbt_build_op(context, config: DbtBuildOpConfig) -> Any:
    dbt_output = context.resources.dbt.build()
    if config.yield_asset_events and "results" in dbt_output.result:
        yield from generate_events(
            dbt_output,
            node_info_to_asset_key=lambda info: config.asset_key_prefix
            + info["unique_id"].split("."),
            manifest_json=context.resources.dbt.get_manifest_json(),
        )
    yield Output(dbt_output)


class DbtRunOpConfig(Config):
    yield_materializations: bool = Field(
        default=True,
        description=(
            "If True, materializations corresponding to the results of the dbt operation will "
            "be yielded when the op executes. Default: True"
        ),
    )
    asset_key_prefix: Optional[List[str]] = Field(
        default=["dbt"],
        description=(
            "If provided and yield_materializations is True, these components will be used to "
            "prefix the generated asset keys."
        ),
    )


[docs]@op(**_DEFAULT_OP_PROPS) def dbt_run_op(context, config: DbtRunOpConfig): dbt_output = context.resources.dbt.run() if config.yield_materializations and "results" in dbt_output.result: yield from generate_materializations(dbt_output, asset_key_prefix=config.asset_key_prefix) yield Output(dbt_output)
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_compile_op(context): return context.resources.dbt.compile()
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_ls_op(context): return context.resources.dbt.ls()
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_test_op(context): return context.resources.dbt.test()
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_snapshot_op(context): return context.resources.dbt.snapshot()
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_seed_op(context): return context.resources.dbt.seed()
[docs]@op(**_DEFAULT_OP_PROPS) def dbt_docs_generate_op(context): return context.resources.dbt.generate_docs()
for dbt_op, cmd in [ (dbt_build_op, "build"), (dbt_run_op, "run"), (dbt_compile_op, "compile"), (dbt_ls_op, "ls"), (dbt_test_op, "test"), (dbt_snapshot_op, "snapshot"), (dbt_seed_op, "seed"), (dbt_docs_generate_op, "docs generate"), ]: dbt_op.__doc__ = _get_doc(dbt_op.name, cmd)