You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.source_asset

from typing import (
    AbstractSet,
    Any,
    Callable,
    Dict,
    Iterator,
    Mapping,
    Optional,
    cast,
)

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.data_version import (
    DATA_VERSION_TAG,
    DataVersion,
    DataVersionsByPartition,
)
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey
from dagster._core.definitions.metadata import (
    ArbitraryMetadataMapping,
    MetadataMapping,
    normalize_metadata,
)
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.resource_requirement import (
    ResourceAddable,
    ResourceRequirement,
    SourceAssetIOManagerRequirement,
    ensure_requirements_satisfied,
    get_resource_key_conflicts,
)
from dagster._core.definitions.utils import (
    DEFAULT_GROUP_NAME,
    DEFAULT_IO_MANAGER_KEY,
    validate_group_name,
)
from dagster._core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidInvocationError,
    DagsterInvalidObservationError,
)
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings

# Going with this catch-all for the time-being to permit pythonic resources
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]


[docs]@experimental_param(param="resource_defs") @experimental_param(param="io_manager_def") class SourceAsset(ResourceAddable): """A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster. Attributes: key (Union[AssetKey, Sequence[str], str]): The key of the asset. metadata (Mapping[str, MetadataValue]): Metadata associated with the asset. io_manager_key (Optional[str]): The key for the IOManager that will be used to load the contents of the asset when it's used as an input to other assets inside a job. io_manager_def (Optional[IOManagerDefinition]): (Experimental) The definition of the IOManager that will be used to load the contents of the asset when it's used as an input to other assets inside a job. resource_defs (Optional[Mapping[str, ResourceDefinition]]): (Experimental) resource definitions that may be required by the :py:class:`dagster.IOManagerDefinition` provided in the `io_manager_def` argument. description (Optional[str]): The description of the asset. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. observe_fn (Optional[SourceAssetObserveFunction]) Observation function for the source asset. """ key: PublicAttr[AssetKey] metadata: PublicAttr[MetadataMapping] raw_metadata: PublicAttr[ArbitraryMetadataMapping] io_manager_key: PublicAttr[Optional[str]] _io_manager_def: PublicAttr[Optional[IOManagerDefinition]] description: PublicAttr[Optional[str]] partitions_def: PublicAttr[Optional[PartitionsDefinition]] group_name: PublicAttr[str] resource_defs: PublicAttr[Dict[str, ResourceDefinition]] observe_fn: PublicAttr[Optional[SourceAssetObserveFunction]] _node_def: Optional[OpDefinition] # computed lazily auto_observe_interval_minutes: Optional[float] def __init__( self, key: CoercibleToAssetKey, metadata: Optional[ArbitraryMetadataMapping] = None, io_manager_key: Optional[str] = None, io_manager_def: Optional[object] = None, description: Optional[str] = None, partitions_def: Optional[PartitionsDefinition] = None, group_name: Optional[str] = None, resource_defs: Optional[Mapping[str, object]] = None, observe_fn: Optional[SourceAssetObserveFunction] = None, *, auto_observe_interval_minutes: Optional[float] = None, # This is currently private because it is necessary for source asset observation functions, # but we have not yet decided on a final API for associated one or more ops with a source # asset. If we were to make this public, then we would have a canonical public # `required_resource_keys` used for observation that might end up conflicting with a set of # required resource keys for a different operation. _required_resource_keys: Optional[AbstractSet[str]] = None, # Add additional fields to with_resources and with_group below ): from dagster._core.execution.build_resources import ( wrap_resources_for_execution, ) self.key = AssetKey.from_coercible(key) metadata = check.opt_mapping_param(metadata, "metadata", key_type=str) self.raw_metadata = metadata self.metadata = normalize_metadata(metadata, allow_invalid=True) resource_defs_dict = dict(check.opt_mapping_param(resource_defs, "resource_defs")) if io_manager_def: if not io_manager_key: io_manager_key = self.key.to_python_identifier("io_manager") if ( io_manager_key in resource_defs_dict and resource_defs_dict[io_manager_key] != io_manager_def ): raise DagsterInvalidDefinitionError( f"Provided conflicting definitions for io manager key '{io_manager_key}'." " Please provide only one definition per key." ) resource_defs_dict[io_manager_key] = io_manager_def self.resource_defs = wrap_resources_for_execution(resource_defs_dict) self.io_manager_key = check.opt_str_param(io_manager_key, "io_manager_key") self.partitions_def = check.opt_inst_param( partitions_def, "partitions_def", PartitionsDefinition ) self.group_name = validate_group_name(group_name) self.description = check.opt_str_param(description, "description") self.observe_fn = check.opt_callable_param(observe_fn, "observe_fn") self._required_resource_keys = check.opt_set_param( _required_resource_keys, "_required_resource_keys", of_type=str ) self._node_def = None self.auto_observe_interval_minutes = check.opt_numeric_param( auto_observe_interval_minutes, "auto_observe_interval_minutes" ) def get_io_manager_key(self) -> str: return self.io_manager_key or DEFAULT_IO_MANAGER_KEY @property def io_manager_def(self) -> Optional[IOManagerDefinition]: io_manager_key = self.get_io_manager_key() return cast( Optional[IOManagerDefinition], self.resource_defs.get(io_manager_key) if io_manager_key else None, ) @public @property def op(self) -> OpDefinition: """OpDefinition: The OpDefinition associated with the observation function of an observable source asset. Throws an error if the asset is not observable. """ check.invariant( isinstance(self.node_def, OpDefinition), "The NodeDefinition for this AssetsDefinition is not of type OpDefinition.", ) return cast(OpDefinition, self.node_def) @public @property def is_observable(self) -> bool: """bool: Whether the asset is observable.""" return self.node_def is not None def _get_op_def_compute_fn(self, observe_fn: SourceAssetObserveFunction): from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, is_context_provided, ) from dagster._core.execution.context.compute import ( OpExecutionContext, ) observe_fn_has_context = is_context_provided(get_function_params(observe_fn)) def fn(context: OpExecutionContext): resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)] resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys} observe_fn_return_value = ( observe_fn(context, **resource_kwargs) if observe_fn_has_context else observe_fn(**resource_kwargs) ) if isinstance(observe_fn_return_value, DataVersion): if self.partitions_def is not None: raise DagsterInvalidObservationError( f"{self.key} is partitioned, so its observe function should return a" " DataVersionsByPartition, not a DataVersion" ) context.log_event( AssetObservation( asset_key=self.key, tags={DATA_VERSION_TAG: observe_fn_return_value.value}, ) ) elif isinstance(observe_fn_return_value, DataVersionsByPartition): if self.partitions_def is None: raise DagsterInvalidObservationError( f"{self.key} is not partitioned, so its observe function should return a" " DataVersion, not a DataVersionsByPartition" ) for ( partition_key, data_version, ) in observe_fn_return_value.data_versions_by_partition.items(): context.log_event( AssetObservation( asset_key=self.key, tags={DATA_VERSION_TAG: data_version.value}, partition=partition_key, ) ) else: raise DagsterInvalidObservationError( f"Observe function for {self.key} must return a DataVersion or" " DataVersionsByPartition, but returned a value of type" f" {type(observe_fn_return_value)}" ) return DecoratedOpFunction(fn) @property def required_resource_keys(self) -> AbstractSet[str]: return {requirement.key for requirement in self.get_resource_requirements()} @property def node_def(self) -> Optional[OpDefinition]: """Op that generates observation metadata for a source asset.""" if self.observe_fn is None: return None if self._node_def is None: self._node_def = OpDefinition( compute_fn=self._get_op_def_compute_fn(self.observe_fn), name=self.key.to_python_identifier(), description=self.description, required_resource_keys=self._required_resource_keys, ) return self._node_def def with_resources(self, resource_defs) -> "SourceAsset": from dagster._core.execution.resources_init import get_transitive_required_resource_keys overlapping_keys = get_resource_key_conflicts(self.resource_defs, resource_defs) if overlapping_keys: raise DagsterInvalidInvocationError( f"SourceAsset with key {self.key} has conflicting resource " "definitions with provided resources for the following keys: " f"{sorted(list(overlapping_keys))}. Either remove the existing " "resources from the asset or change the resource keys so that " "they don't overlap." ) merged_resource_defs = merge_dicts(resource_defs, self.resource_defs) # Ensure top-level resource requirements are met - except for # io_manager, since that is a default it can be resolved later. ensure_requirements_satisfied(merged_resource_defs, list(self.get_resource_requirements())) io_manager_def = merged_resource_defs.get(self.get_io_manager_key()) if not io_manager_def and self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY: raise DagsterInvalidDefinitionError( f"SourceAsset with asset key {self.key} requires IO manager with key" f" '{self.get_io_manager_key()}', but none was provided." ) relevant_keys = get_transitive_required_resource_keys( {*self._required_resource_keys, self.get_io_manager_key()}, merged_resource_defs ) relevant_resource_defs = { key: resource_def for key, resource_def in merged_resource_defs.items() if key in relevant_keys } io_manager_key = ( self.get_io_manager_key() if self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY else None ) with disable_dagster_warnings(): return SourceAsset( key=self.key, io_manager_key=io_manager_key, description=self.description, partitions_def=self.partitions_def, metadata=self.raw_metadata, resource_defs=relevant_resource_defs, group_name=self.group_name, observe_fn=self.observe_fn, auto_observe_interval_minutes=self.auto_observe_interval_minutes, _required_resource_keys=self._required_resource_keys, ) def with_attributes( self, group_name: Optional[str] = None, key: Optional[AssetKey] = None ) -> "SourceAsset": if group_name is not None and self.group_name != DEFAULT_GROUP_NAME: raise DagsterInvalidDefinitionError( "A group name has already been provided to source asset" f" {self.key.to_user_string()}" ) with disable_dagster_warnings(): return SourceAsset( key=key or self.key, metadata=self.raw_metadata, io_manager_key=self.io_manager_key, io_manager_def=self.io_manager_def, description=self.description, partitions_def=self.partitions_def, group_name=group_name, resource_defs=self.resource_defs, observe_fn=self.observe_fn, auto_observe_interval_minutes=self.auto_observe_interval_minutes, _required_resource_keys=self._required_resource_keys, ) def get_resource_requirements(self) -> Iterator[ResourceRequirement]: if self.node_def is not None: yield from self.node_def.get_resource_requirements() yield SourceAssetIOManagerRequirement( key=self.get_io_manager_key(), asset_key=self.key.to_string() ) for source_key, resource_def in self.resource_defs.items(): yield from resource_def.get_resource_requirements(outer_context=source_key) def __eq__(self, other: object) -> bool: if not isinstance(other, SourceAsset): return False else: return ( self.key == other.key and self.raw_metadata == other.raw_metadata and self.io_manager_key == other.io_manager_key and self.description == other.description and self.group_name == other.group_name and self.resource_defs == other.resource_defs and self.observe_fn == other.observe_fn )