You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.input_manager

from abc import ABC, abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional, Union, cast, overload

from typing_extensions import TypeAlias, TypeGuard

import dagster._check as check
from dagster._core.decorator_utils import has_at_least_one_parameter
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
    CoercableToConfigSchema,
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import ResourceDefinition, ResourceFunction

if TYPE_CHECKING:
    from dagster._core.execution.context.input import InputContext

InputLoadFn: TypeAlias = Union[
    Callable[["InputContext"], object],
    Callable[[], object],
]


[docs]class InputManager(ABC): """Base interface for classes that are responsible for loading solid inputs.""" @abstractmethod def load_input(self, context: "InputContext") -> object: """The user-defined read method that loads an input to a solid. Args: context (InputContext): The input context. Returns: Any: The data object. """
class IInputManagerDefinition: @property @abstractmethod def input_config_schema(self) -> IDefinitionConfigSchema: """The schema for per-input configuration for inputs that are managed by this input manager. """
[docs]class InputManagerDefinition(ResourceDefinition, IInputManagerDefinition): """Definition of an input manager resource. Input managers load op inputs. An InputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an :py:class:`InputManager`. The easiest way to create an InputManagerDefinition is with the :py:func:`@input_manager <input_manager>` decorator. """ def __init__( self, resource_fn: ResourceFunction, config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self._input_config_schema = convert_user_facing_definition_config_schema( input_config_schema ) super(InputManagerDefinition, self).__init__( resource_fn=resource_fn, config_schema=config_schema, description=description, required_resource_keys=required_resource_keys, version=version, ) @property def input_config_schema(self) -> IDefinitionConfigSchema: return self._input_config_schema def copy_for_configured( self, description: Optional[str], config_schema: CoercableToConfigSchema, ) -> "InputManagerDefinition": return InputManagerDefinition( config_schema=config_schema, description=description or self.description, resource_fn=self.resource_fn, required_resource_keys=self.required_resource_keys, input_config_schema=self.input_config_schema, )
@overload def input_manager( config_schema: InputLoadFn, ) -> InputManagerDefinition: ... @overload def input_manager( config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Callable[[InputLoadFn], InputManagerDefinition]: ...
[docs]def input_manager( config_schema: Union[InputLoadFn, Optional[CoercableToConfigSchema]] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Union[InputManagerDefinition, Callable[[InputLoadFn], InputManagerDefinition]]: """Define an input manager. Input managers load op inputs, either from upstream outputs or by providing default values. The decorated function should accept a :py:class:`InputContext` and resource config, and return a loaded object that will be passed into one of the inputs of an op. The decorator produces an :py:class:`InputManagerDefinition`. Args: config_schema (Optional[ConfigSchema]): The schema for the resource-level config. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of the resource. input_config_schema (Optional[ConfigSchema]): A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided. required_resource_keys (Optional[Set[str]]): Keys for the resources required by the input manager. version (Optional[str]): (Experimental) the version of the input manager definition. **Examples:** .. code-block:: python from dagster import input_manager, op, job, In @input_manager def csv_loader(_): return read_csv("some/path") @op(ins={"input1": In(input_manager_key="csv_loader_key")}) def my_op(_, input1): do_stuff(input1) @job(resource_defs={"csv_loader_key": csv_loader}) def my_job(): my_op() @input_manager(config_schema={"base_dir": str}) def csv_loader(context): return read_csv(context.resource_config["base_dir"] + "/some/path") @input_manager(input_config_schema={"path": str}) def csv_loader(context): return read_csv(context.config["path"]) """ if _is_input_load_fn(config_schema): return _InputManagerDecoratorCallable()(config_schema) config_schema = cast(Optional[CoercableToConfigSchema], config_schema) def _wrap(load_fn: InputLoadFn) -> InputManagerDefinition: return _InputManagerDecoratorCallable( config_schema=config_schema, description=description, version=version, input_config_schema=input_config_schema, required_resource_keys=required_resource_keys, )(load_fn) return _wrap
def _is_input_load_fn(obj: Union[InputLoadFn, CoercableToConfigSchema]) -> TypeGuard[InputLoadFn]: return callable(obj) and not is_callable_valid_config_arg(obj) class InputManagerWrapper(InputManager): def __init__(self, load_fn: InputLoadFn): self._load_fn = load_fn def load_input(self, context: "InputContext") -> object: # the @input_manager decorated function (self._load_fn) may return a direct value that # should be used or an instance of an InputManager. So we call self._load_fn and see if the # result is an InputManager. If so we call it's load_input method intermediate = ( # type-ignore because function being used as attribute self._load_fn(context) if has_at_least_one_parameter(self._load_fn) else self._load_fn() # type: ignore # (strict type guard) ) if isinstance(intermediate, InputManager): return intermediate.load_input(context) return intermediate class _InputManagerDecoratorCallable: def __init__( self, config_schema: CoercableToConfigSchema = None, description: Optional[str] = None, version: Optional[str] = None, input_config_schema: CoercableToConfigSchema = None, required_resource_keys: Optional[AbstractSet[str]] = None, ): self.config_schema = config_schema self.description = check.opt_str_param(description, "description") self.version = check.opt_str_param(version, "version") self.input_config_schema = input_config_schema self.required_resource_keys = required_resource_keys def __call__(self, load_fn: InputLoadFn) -> InputManagerDefinition: check.callable_param(load_fn, "load_fn") def _resource_fn(_): return InputManagerWrapper(load_fn) input_manager_def = InputManagerDefinition( resource_fn=_resource_fn, config_schema=self.config_schema, description=self.description, version=self.version, input_config_schema=self.input_config_schema, required_resource_keys=self.required_resource_keys, ) # `update_wrapper` typing cannot currently handle a Union of Callables correctly update_wrapper(input_manager_def, wrapped=load_fn) # type: ignore return input_manager_def