This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from contextlib import ExitStack
from typing import Any, Dict, Mapping, Optional, Type, cast
import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.job_definition import (
default_job_io_manager_with_fs_io_manager_schema,
)
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY
from dagster._core.execution.build_resources import build_resources, get_mapped_resource_config
from dagster._core.execution.context.input import build_input_context
from dagster._core.execution.context.output import build_output_context
from dagster._core.execution.resources_init import get_transitive_required_resource_keys
from dagster._core.instance import DagsterInstance
from dagster._core.instance.config import is_dagster_home_set
from dagster._core.types.dagster_type import resolve_dagster_type
from dagster._utils.merger import merge_dicts
from .io_manager import IOManager
[docs]class AssetValueLoader:
"""Caches resource definitions that are used to load asset values across multiple load
invocations.
Should not be instantiated directly. Instead, use
:py:meth:`~dagster.RepositoryDefinition.get_asset_value_loader`.
"""
def __init__(
self,
assets_defs_by_key: Mapping[AssetKey, AssetsDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
instance: Optional[DagsterInstance] = None,
):
self._assets_defs_by_key = assets_defs_by_key
self._source_assets_by_key = source_assets_by_key
self._resource_instance_cache: Dict[str, object] = {}
self._exit_stack: ExitStack = ExitStack().__enter__()
if not instance and is_dagster_home_set():
self._instance = self._exit_stack.enter_context(DagsterInstance.get())
else:
self._instance = instance
def _ensure_resource_instances_in_cache(
self,
resource_defs: Mapping[str, ResourceDefinition],
resource_config: Optional[Mapping[str, Any]] = None,
):
for built_resource_key, built_resource in (
self._exit_stack.enter_context(
build_resources(
resources={
resource_key: self._resource_instance_cache.get(resource_key, resource_def)
for resource_key, resource_def in resource_defs.items()
},
instance=self._instance,
resource_config=resource_config,
)
)
._asdict()
.items()
):
self._resource_instance_cache[built_resource_key] = built_resource
[docs] @public
def load_asset_value(
self,
asset_key: CoercibleToAssetKey,
*,
python_type: Optional[Type[object]] = None,
partition_key: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
resource_config: Optional[Any] = None,
) -> object:
"""Loads the contents of an asset as a Python object.
Invokes `load_input` on the :py:class:`IOManager` associated with the asset.
Args:
asset_key (Union[AssetKey, Sequence[str], str]): The key of the asset to load.
python_type (Optional[Type]): The python type to load the asset as. This is what will
be returned inside `load_input` by `context.dagster_type.typing_type`.
partition_key (Optional[str]): The partition of the asset to load.
metadata (Optional[Dict[str, Any]]): Input metadata to pass to the :py:class:`IOManager`
(is equivalent to setting the metadata argument in `In` or `AssetIn`).
resource_config (Optional[Any]): A dictionary of resource configurations to be passed
to the :py:class:`IOManager`.
Returns:
The contents of an asset as a Python object.
"""
asset_key = AssetKey.from_coercible(asset_key)
resource_config = resource_config or {}
output_metadata = {}
if asset_key in self._assets_defs_by_key:
assets_def = self._assets_defs_by_key[asset_key]
resource_defs = merge_dicts(
{DEFAULT_IO_MANAGER_KEY: default_job_io_manager_with_fs_io_manager_schema},
assets_def.resource_defs,
)
io_manager_key = assets_def.get_io_manager_key_for_asset_key(asset_key)
io_manager_def = resource_defs[io_manager_key]
name = assets_def.get_output_name_for_asset_key(asset_key)
output_metadata = assets_def.metadata_by_key[asset_key]
op_def = assets_def.get_op_def_for_asset_key(asset_key)
asset_partitions_def = assets_def.partitions_def
elif asset_key in self._source_assets_by_key:
source_asset = self._source_assets_by_key[asset_key]
resource_defs = merge_dicts(
{DEFAULT_IO_MANAGER_KEY: default_job_io_manager_with_fs_io_manager_schema},
source_asset.resource_defs,
)
io_manager_key = source_asset.get_io_manager_key()
io_manager_def = resource_defs[io_manager_key]
name = asset_key.path[-1]
output_metadata = source_asset.raw_metadata
op_def = None
asset_partitions_def = source_asset.partitions_def
else:
check.failed(f"Asset key {asset_key} not found")
required_resource_keys = get_transitive_required_resource_keys(
io_manager_def.required_resource_keys, resource_defs
) | {io_manager_key}
self._ensure_resource_instances_in_cache(
{k: v for k, v in resource_defs.items() if k in required_resource_keys},
resource_config=resource_config,
)
io_manager = cast(IOManager, self._resource_instance_cache[io_manager_key])
io_config = resource_config.get(io_manager_key)
io_resource_config = {io_manager_key: io_config} if io_config else {}
io_manager_config = get_mapped_resource_config(
{io_manager_key: io_manager_def}, io_resource_config
)
input_context = build_input_context(
name=None,
asset_key=asset_key,
dagster_type=resolve_dagster_type(python_type),
upstream_output=build_output_context(
name=name,
metadata=output_metadata,
asset_key=asset_key,
op_def=op_def,
resource_config=resource_config,
),
resources=self._resource_instance_cache,
resource_config=io_manager_config[io_manager_key].config,
partition_key=partition_key,
asset_partition_key_range=(
PartitionKeyRange(partition_key, partition_key)
if partition_key is not None
else None
),
asset_partitions_def=asset_partitions_def,
instance=self._instance,
metadata=metadata,
)
return io_manager.load_input(input_context)
def __enter__(self):
return self
def __exit__(self, *exc):
self._exit_stack.close()