This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Iterator,
Mapping,
Optional,
cast,
)
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
DataVersion,
DataVersionsByPartition,
)
from dagster._core.definitions.events import AssetKey, AssetObservation, CoercibleToAssetKey
from dagster._core.definitions.metadata import (
ArbitraryMetadataMapping,
MetadataMapping,
normalize_metadata,
)
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.resource_requirement import (
ResourceAddable,
ResourceRequirement,
SourceAssetIOManagerRequirement,
ensure_requirements_satisfied,
get_resource_key_conflicts,
)
from dagster._core.definitions.utils import (
DEFAULT_GROUP_NAME,
DEFAULT_IO_MANAGER_KEY,
validate_group_name,
)
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidObservationError,
)
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings
# Going with this catch-all for the time-being to permit pythonic resources
SourceAssetObserveFunction: TypeAlias = Callable[..., Any]
[docs]@experimental_param(param="resource_defs")
@experimental_param(param="io_manager_def")
class SourceAsset(ResourceAddable):
"""A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.
Attributes:
key (Union[AssetKey, Sequence[str], str]): The key of the asset.
metadata (Mapping[str, MetadataValue]): Metadata associated with the asset.
io_manager_key (Optional[str]): The key for the IOManager that will be used to load the contents of
the asset when it's used as an input to other assets inside a job.
io_manager_def (Optional[IOManagerDefinition]): (Experimental) The definition of the IOManager that will be used to load the contents of
the asset when it's used as an input to other assets inside a job.
resource_defs (Optional[Mapping[str, ResourceDefinition]]): (Experimental) resource definitions that may be required by the :py:class:`dagster.IOManagerDefinition` provided in the `io_manager_def` argument.
description (Optional[str]): The description of the asset.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
observe_fn (Optional[SourceAssetObserveFunction]) Observation function for the source asset.
"""
key: PublicAttr[AssetKey]
metadata: PublicAttr[MetadataMapping]
raw_metadata: PublicAttr[ArbitraryMetadataMapping]
io_manager_key: PublicAttr[Optional[str]]
_io_manager_def: PublicAttr[Optional[IOManagerDefinition]]
description: PublicAttr[Optional[str]]
partitions_def: PublicAttr[Optional[PartitionsDefinition]]
group_name: PublicAttr[str]
resource_defs: PublicAttr[Dict[str, ResourceDefinition]]
observe_fn: PublicAttr[Optional[SourceAssetObserveFunction]]
_node_def: Optional[OpDefinition] # computed lazily
auto_observe_interval_minutes: Optional[float]
def __init__(
self,
key: CoercibleToAssetKey,
metadata: Optional[ArbitraryMetadataMapping] = None,
io_manager_key: Optional[str] = None,
io_manager_def: Optional[object] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
group_name: Optional[str] = None,
resource_defs: Optional[Mapping[str, object]] = None,
observe_fn: Optional[SourceAssetObserveFunction] = None,
*,
auto_observe_interval_minutes: Optional[float] = None,
# This is currently private because it is necessary for source asset observation functions,
# but we have not yet decided on a final API for associated one or more ops with a source
# asset. If we were to make this public, then we would have a canonical public
# `required_resource_keys` used for observation that might end up conflicting with a set of
# required resource keys for a different operation.
_required_resource_keys: Optional[AbstractSet[str]] = None,
# Add additional fields to with_resources and with_group below
):
from dagster._core.execution.build_resources import (
wrap_resources_for_execution,
)
self.key = AssetKey.from_coercible(key)
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
self.raw_metadata = metadata
self.metadata = normalize_metadata(metadata, allow_invalid=True)
resource_defs_dict = dict(check.opt_mapping_param(resource_defs, "resource_defs"))
if io_manager_def:
if not io_manager_key:
io_manager_key = self.key.to_python_identifier("io_manager")
if (
io_manager_key in resource_defs_dict
and resource_defs_dict[io_manager_key] != io_manager_def
):
raise DagsterInvalidDefinitionError(
f"Provided conflicting definitions for io manager key '{io_manager_key}'."
" Please provide only one definition per key."
)
resource_defs_dict[io_manager_key] = io_manager_def
self.resource_defs = wrap_resources_for_execution(resource_defs_dict)
self.io_manager_key = check.opt_str_param(io_manager_key, "io_manager_key")
self.partitions_def = check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
)
self.group_name = validate_group_name(group_name)
self.description = check.opt_str_param(description, "description")
self.observe_fn = check.opt_callable_param(observe_fn, "observe_fn")
self._required_resource_keys = check.opt_set_param(
_required_resource_keys, "_required_resource_keys", of_type=str
)
self._node_def = None
self.auto_observe_interval_minutes = check.opt_numeric_param(
auto_observe_interval_minutes, "auto_observe_interval_minutes"
)
def get_io_manager_key(self) -> str:
return self.io_manager_key or DEFAULT_IO_MANAGER_KEY
@property
def io_manager_def(self) -> Optional[IOManagerDefinition]:
io_manager_key = self.get_io_manager_key()
return cast(
Optional[IOManagerDefinition],
self.resource_defs.get(io_manager_key) if io_manager_key else None,
)
@public
@property
def op(self) -> OpDefinition:
"""OpDefinition: The OpDefinition associated with the observation function of an observable
source asset.
Throws an error if the asset is not observable.
"""
check.invariant(
isinstance(self.node_def, OpDefinition),
"The NodeDefinition for this AssetsDefinition is not of type OpDefinition.",
)
return cast(OpDefinition, self.node_def)
@public
@property
def is_observable(self) -> bool:
"""bool: Whether the asset is observable."""
return self.node_def is not None
def _get_op_def_compute_fn(self, observe_fn: SourceAssetObserveFunction):
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
is_context_provided,
)
from dagster._core.execution.context.compute import (
OpExecutionContext,
)
observe_fn_has_context = is_context_provided(get_function_params(observe_fn))
def fn(context: OpExecutionContext):
resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)]
resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys}
observe_fn_return_value = (
observe_fn(context, **resource_kwargs)
if observe_fn_has_context
else observe_fn(**resource_kwargs)
)
if isinstance(observe_fn_return_value, DataVersion):
if self.partitions_def is not None:
raise DagsterInvalidObservationError(
f"{self.key} is partitioned, so its observe function should return a"
" DataVersionsByPartition, not a DataVersion"
)
context.log_event(
AssetObservation(
asset_key=self.key,
tags={DATA_VERSION_TAG: observe_fn_return_value.value},
)
)
elif isinstance(observe_fn_return_value, DataVersionsByPartition):
if self.partitions_def is None:
raise DagsterInvalidObservationError(
f"{self.key} is not partitioned, so its observe function should return a"
" DataVersion, not a DataVersionsByPartition"
)
for (
partition_key,
data_version,
) in observe_fn_return_value.data_versions_by_partition.items():
context.log_event(
AssetObservation(
asset_key=self.key,
tags={DATA_VERSION_TAG: data_version.value},
partition=partition_key,
)
)
else:
raise DagsterInvalidObservationError(
f"Observe function for {self.key} must return a DataVersion or"
" DataVersionsByPartition, but returned a value of type"
f" {type(observe_fn_return_value)}"
)
return DecoratedOpFunction(fn)
@property
def required_resource_keys(self) -> AbstractSet[str]:
return {requirement.key for requirement in self.get_resource_requirements()}
@property
def node_def(self) -> Optional[OpDefinition]:
"""Op that generates observation metadata for a source asset."""
if self.observe_fn is None:
return None
if self._node_def is None:
self._node_def = OpDefinition(
compute_fn=self._get_op_def_compute_fn(self.observe_fn),
name=self.key.to_python_identifier(),
description=self.description,
required_resource_keys=self._required_resource_keys,
)
return self._node_def
def with_resources(self, resource_defs) -> "SourceAsset":
from dagster._core.execution.resources_init import get_transitive_required_resource_keys
overlapping_keys = get_resource_key_conflicts(self.resource_defs, resource_defs)
if overlapping_keys:
raise DagsterInvalidInvocationError(
f"SourceAsset with key {self.key} has conflicting resource "
"definitions with provided resources for the following keys: "
f"{sorted(list(overlapping_keys))}. Either remove the existing "
"resources from the asset or change the resource keys so that "
"they don't overlap."
)
merged_resource_defs = merge_dicts(resource_defs, self.resource_defs)
# Ensure top-level resource requirements are met - except for
# io_manager, since that is a default it can be resolved later.
ensure_requirements_satisfied(merged_resource_defs, list(self.get_resource_requirements()))
io_manager_def = merged_resource_defs.get(self.get_io_manager_key())
if not io_manager_def and self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY:
raise DagsterInvalidDefinitionError(
f"SourceAsset with asset key {self.key} requires IO manager with key"
f" '{self.get_io_manager_key()}', but none was provided."
)
relevant_keys = get_transitive_required_resource_keys(
{*self._required_resource_keys, self.get_io_manager_key()}, merged_resource_defs
)
relevant_resource_defs = {
key: resource_def
for key, resource_def in merged_resource_defs.items()
if key in relevant_keys
}
io_manager_key = (
self.get_io_manager_key()
if self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY
else None
)
with disable_dagster_warnings():
return SourceAsset(
key=self.key,
io_manager_key=io_manager_key,
description=self.description,
partitions_def=self.partitions_def,
metadata=self.raw_metadata,
resource_defs=relevant_resource_defs,
group_name=self.group_name,
observe_fn=self.observe_fn,
auto_observe_interval_minutes=self.auto_observe_interval_minutes,
_required_resource_keys=self._required_resource_keys,
)
def with_attributes(
self, group_name: Optional[str] = None, key: Optional[AssetKey] = None
) -> "SourceAsset":
if group_name is not None and self.group_name != DEFAULT_GROUP_NAME:
raise DagsterInvalidDefinitionError(
"A group name has already been provided to source asset"
f" {self.key.to_user_string()}"
)
with disable_dagster_warnings():
return SourceAsset(
key=key or self.key,
metadata=self.raw_metadata,
io_manager_key=self.io_manager_key,
io_manager_def=self.io_manager_def,
description=self.description,
partitions_def=self.partitions_def,
group_name=group_name,
resource_defs=self.resource_defs,
observe_fn=self.observe_fn,
auto_observe_interval_minutes=self.auto_observe_interval_minutes,
_required_resource_keys=self._required_resource_keys,
)
def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
if self.node_def is not None:
yield from self.node_def.get_resource_requirements()
yield SourceAssetIOManagerRequirement(
key=self.get_io_manager_key(), asset_key=self.key.to_string()
)
for source_key, resource_def in self.resource_defs.items():
yield from resource_def.get_resource_requirements(outer_context=source_key)
def __eq__(self, other: object) -> bool:
if not isinstance(other, SourceAsset):
return False
else:
return (
self.key == other.key
and self.raw_metadata == other.raw_metadata
and self.io_manager_key == other.io_manager_key
and self.description == other.description
and self.group_name == other.group_name
and self.resource_defs == other.resource_defs
and self.observe_fn == other.observe_fn
)