You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.unresolved_asset_job_definition

from collections import defaultdict
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Any, Mapping, NamedTuple, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import deprecated
from dagster._core.definitions import AssetKey
from dagster._core.definitions.run_request import RunRequest
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.instance import DynamicPartitionsStore

from .asset_layer import build_asset_selection_job
from .config import ConfigMapping
from .metadata import RawMetadataValue

if TYPE_CHECKING:
    from dagster._core.definitions import (
        AssetSelection,
        ExecutorDefinition,
        HookDefinition,
        JobDefinition,
        PartitionedConfig,
        PartitionsDefinition,
        ResourceDefinition,
    )
    from dagster._core.definitions.asset_graph import InternalAssetGraph
    from dagster._core.definitions.asset_selection import CoercibleToAssetSelection
    from dagster._core.definitions.run_config import RunConfig


class UnresolvedAssetJobDefinition(
    NamedTuple(
        "_UnresolvedAssetJobDefinition",
        [
            ("name", str),
            ("selection", "AssetSelection"),
            (
                "config",
                Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]],
            ),
            ("description", Optional[str]),
            ("tags", Optional[Mapping[str, Any]]),
            ("metadata", Optional[Mapping[str, RawMetadataValue]]),
            ("partitions_def", Optional["PartitionsDefinition"]),
            ("executor_def", Optional["ExecutorDefinition"]),
            ("hooks", Optional[AbstractSet["HookDefinition"]]),
        ],
    )
):
    def __new__(
        cls,
        name: str,
        selection: "AssetSelection",
        config: Optional[
            Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig", "RunConfig"]
        ] = None,
        description: Optional[str] = None,
        tags: Optional[Mapping[str, Any]] = None,
        metadata: Optional[Mapping[str, RawMetadataValue]] = None,
        partitions_def: Optional["PartitionsDefinition"] = None,
        executor_def: Optional["ExecutorDefinition"] = None,
        hooks: Optional[AbstractSet["HookDefinition"]] = None,
    ):
        from dagster._core.definitions import (
            AssetSelection,
            ExecutorDefinition,
            HookDefinition,
            PartitionsDefinition,
        )
        from dagster._core.definitions.run_config import convert_config_input

        return super(UnresolvedAssetJobDefinition, cls).__new__(
            cls,
            name=check.str_param(name, "name"),
            selection=check.inst_param(selection, "selection", AssetSelection),
            config=convert_config_input(config),
            description=check.opt_str_param(description, "description"),
            tags=check.opt_mapping_param(tags, "tags"),
            metadata=check.opt_mapping_param(metadata, "metadata"),
            partitions_def=check.opt_inst_param(
                partitions_def, "partitions_def", PartitionsDefinition
            ),
            executor_def=check.opt_inst_param(executor_def, "partitions_def", ExecutorDefinition),
            hooks=check.opt_nullable_set_param(hooks, "hooks", of_type=HookDefinition),
        )

    @deprecated(
        breaking_version="2.0.0",
        additional_warn_text="Directly instantiate `RunRequest(partition_key=...)` instead.",
    )
    def run_request_for_partition(
        self,
        partition_key: str,
        run_key: Optional[str] = None,
        tags: Optional[Mapping[str, str]] = None,
        asset_selection: Optional[Sequence[AssetKey]] = None,
        run_config: Optional[Mapping[str, Any]] = None,
        current_time: Optional[datetime] = None,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> RunRequest:
        """Creates a RunRequest object for a run that processes the given partition.

        Args:
            partition_key: The key of the partition to request a run for.
            run_key (Optional[str]): A string key to identify this launched run. For sensors, ensures that
                only one run is created per run key across all sensor evaluations.  For schedules,
                ensures that one run is created per tick, across failure recoveries. Passing in a `None`
                value means that a run will always be launched per evaluation.
            tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
                to the launched run.
            run_config (Optional[Mapping[str, Any]]: Configuration for the run. If the job has
                a :py:class:`PartitionedConfig`, this value will override replace the config
                provided by it.
            current_time (Optional[datetime]): Used to determine which time-partitions exist.
                Defaults to now.
            dynamic_partitions_store (Optional[DynamicPartitionsStore]): The DynamicPartitionsStore
                object that is responsible for fetching dynamic partitions. Required when the
                partitions definition is a DynamicPartitionsDefinition with a name defined. Users
                can pass the DagsterInstance fetched via `context.instance` to this argument.

        Returns:
            RunRequest: an object that requests a run to process the given partition.
        """
        from dagster._core.definitions.partition import (
            DynamicPartitionsDefinition,
            PartitionedConfig,
        )

        if not self.partitions_def:
            check.failed("Called run_request_for_partition on a non-partitioned job")

        partitioned_config = PartitionedConfig.from_flexible_config(
            self.config, self.partitions_def
        )

        if (
            isinstance(self.partitions_def, DynamicPartitionsDefinition)
            and self.partitions_def.name
        ):
            # Do not support using run_request_for_partition with dynamic partitions,
            # since this requires querying the instance once per run request for the
            # existent dynamic partitions
            check.failed(
                "run_request_for_partition is not supported for dynamic partitions. Instead, use"
                " RunRequest(partition_key=...)"
            )

        self.partitions_def.validate_partition_key(
            partition_key,
            current_time=current_time,
            dynamic_partitions_store=dynamic_partitions_store,
        )

        run_config = (
            run_config
            if run_config is not None
            else partitioned_config.get_run_config_for_partition_key(partition_key)
        )
        run_request_tags = {
            **(tags or {}),
            **partitioned_config.get_tags_for_partition_key(partition_key),
        }

        return RunRequest(
            job_name=self.name,
            run_key=run_key,
            run_config=run_config,
            tags=run_request_tags,
            asset_selection=asset_selection,
            partition_key=partition_key,
        )

    def resolve(
        self,
        asset_graph: "InternalAssetGraph",
        default_executor_def: Optional["ExecutorDefinition"] = None,
        resource_defs: Optional[Mapping[str, "ResourceDefinition"]] = None,
    ) -> "JobDefinition":
        """Resolve this UnresolvedAssetJobDefinition into a JobDefinition."""
        assets = asset_graph.assets
        source_assets = asset_graph.source_assets
        selected_asset_keys = self.selection.resolve(asset_graph)

        asset_keys_by_partitions_def = defaultdict(set)
        for asset_key in selected_asset_keys:
            partitions_def = asset_graph.get_partitions_def(asset_key)
            if partitions_def is not None:
                asset_keys_by_partitions_def[partitions_def].add(asset_key)

        if len(asset_keys_by_partitions_def) > 1:
            keys_by_partitions_def_str = "\n".join(
                f"{partitions_def}: {asset_keys}"
                for partitions_def, asset_keys in asset_keys_by_partitions_def.items()
            )
            raise DagsterInvalidDefinitionError(
                f"Multiple partitioned assets exist in assets job '{self.name}'. Selected assets"
                " must have the same partitions definitions, but the selected assets have"
                f" different partitions definitions: \n{keys_by_partitions_def_str}"
            )

        inferred_partitions_def = (
            next(iter(asset_keys_by_partitions_def.keys()))
            if asset_keys_by_partitions_def
            else None
        )
        if (
            inferred_partitions_def
            and self.partitions_def != inferred_partitions_def
            and self.partitions_def is not None
        ):
            raise DagsterInvalidDefinitionError(
                f"Job '{self.name}' received a partitions_def of {self.partitions_def}, but the"
                f" selected assets {next(iter(asset_keys_by_partitions_def.values()))} have a"
                f" non-matching partitions_def of {inferred_partitions_def}"
            )

        return build_asset_selection_job(
            name=self.name,
            assets=assets,
            config=self.config,
            source_assets=source_assets,
            description=self.description,
            tags=self.tags,
            metadata=self.metadata,
            asset_selection=selected_asset_keys,
            partitions_def=self.partitions_def if self.partitions_def else inferred_partitions_def,
            executor_def=self.executor_def or default_executor_def,
            hooks=self.hooks,
            resource_defs=resource_defs,
        )


[docs]def define_asset_job( name: str, selection: Optional["CoercibleToAssetSelection"] = None, config: Optional[ Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig", "RunConfig"] ] = None, description: Optional[str] = None, tags: Optional[Mapping[str, Any]] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, partitions_def: Optional["PartitionsDefinition"] = None, executor_def: Optional["ExecutorDefinition"] = None, hooks: Optional[AbstractSet["HookDefinition"]] = None, ) -> UnresolvedAssetJobDefinition: """Creates a definition of a job which will either materialize a selection of assets or observe a selection of source assets. This will only be resolved to a JobDefinition once placed in a code location. Args: name (str): The name for the job. selection (Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]): The assets that will be materialized or observed when the job is run. The selected assets must all be included in the assets that are passed to the assets argument of the Definitions object that this job is included on. The string "my_asset*" selects my_asset and all downstream assets within the code location. A list of strings represents the union of all assets selected by strings within the list. The selection will be resolved to a set of assets when the location is loaded. If the selection resolves to all source assets, the created job will perform source asset observations. If the selection resolves to all regular assets, the created job will materialize assets. If the selection resolves to a mixed set of source assets and regular assets, an error will be thrown. config: Describes how the Job is parameterized at runtime. If no value is provided, then the schema for the job's run config is a standard format based on its ops and resources. If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job's run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets. If a :py:class:`ConfigMapping` object is provided, then the schema for the job's run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job. tags (Optional[Mapping[str, Any]]): Arbitrary information that will be attached to the execution of the Job. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag values provided at invocation time. metadata (Optional[Mapping[str, RawMetadataValue]]): Arbitrary metadata about the job. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method. description (Optional[str]): A description for the Job. partitions_def (Optional[PartitionsDefinition]): Defines the set of partitions for this job. All AssetDefinitions selected for this job must have a matching PartitionsDefinition. If no PartitionsDefinition is provided, the PartitionsDefinition will be inferred from the selected AssetDefinitions. executor_def (Optional[ExecutorDefinition]): How this Job will be executed. Defaults to :py:class:`multi_or_in_process_executor`, which can be switched between multi-process and in-process modes of execution. The default mode of execution is multi-process. Returns: UnresolvedAssetJobDefinition: The job, which can be placed inside a code location. Examples: .. code-block:: python # A job that targets all assets in the code location: @asset def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets")], ) # A job that targets a single asset @asset def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets", selection=[asset1])], ) # A job that targets all the assets in a group: defs = Definitions( assets=assets, jobs=[define_asset_job("marketing_job", selection=AssetSelection.groups("marketing"))], ) @observable_source_asset def source_asset(): ... # A job that observes a source asset: defs = Definitions( assets=assets, jobs=[define_asset_job("observation_job", selection=[source_asset])], ) # Resources are supplied to the assets, not the job: @asset(required_resource_keys={"slack_client"}) def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets")], resources={"slack_client": prod_slack_client}, ) """ from dagster._core.definitions import AssetSelection # convert string-based selections to AssetSelection objects if selection is None: resolved_selection = AssetSelection.all() else: resolved_selection = AssetSelection.from_coercible(selection) return UnresolvedAssetJobDefinition( name=name, selection=resolved_selection, config=config, description=description, tags=tags, metadata=metadata, partitions_def=partitions_def, executor_def=executor_def, hooks=hooks, )