This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from abc import ABC, abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional, Union, cast, overload
from typing_extensions import TypeAlias, TypeGuard
import dagster._check as check
from dagster._core.decorator_utils import has_at_least_one_parameter
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
CoercableToConfigSchema,
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import ResourceDefinition, ResourceFunction
if TYPE_CHECKING:
from dagster._core.execution.context.input import InputContext
InputLoadFn: TypeAlias = Union[
Callable[["InputContext"], object],
Callable[[], object],
]
[docs]class InputManager(ABC):
"""Base interface for classes that are responsible for loading solid inputs."""
@abstractmethod
def load_input(self, context: "InputContext") -> object:
"""The user-defined read method that loads an input to a solid.
Args:
context (InputContext): The input context.
Returns:
Any: The data object.
"""
class IInputManagerDefinition:
@property
@abstractmethod
def input_config_schema(self) -> IDefinitionConfigSchema:
"""The schema for per-input configuration for inputs that are managed by this
input manager.
"""
[docs]class InputManagerDefinition(ResourceDefinition, IInputManagerDefinition):
"""Definition of an input manager resource.
Input managers load op inputs.
An InputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an
:py:class:`InputManager`.
The easiest way to create an InputManagerDefinition is with the
:py:func:`@input_manager <input_manager>` decorator.
"""
def __init__(
self,
resource_fn: ResourceFunction,
config_schema: Optional[CoercableToConfigSchema] = None,
description: Optional[str] = None,
input_config_schema: Optional[CoercableToConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
):
self._input_config_schema = convert_user_facing_definition_config_schema(
input_config_schema
)
super(InputManagerDefinition, self).__init__(
resource_fn=resource_fn,
config_schema=config_schema,
description=description,
required_resource_keys=required_resource_keys,
version=version,
)
@property
def input_config_schema(self) -> IDefinitionConfigSchema:
return self._input_config_schema
def copy_for_configured(
self,
description: Optional[str],
config_schema: CoercableToConfigSchema,
) -> "InputManagerDefinition":
return InputManagerDefinition(
config_schema=config_schema,
description=description or self.description,
resource_fn=self.resource_fn,
required_resource_keys=self.required_resource_keys,
input_config_schema=self.input_config_schema,
)
@overload
def input_manager(
config_schema: InputLoadFn,
) -> InputManagerDefinition:
...
@overload
def input_manager(
config_schema: Optional[CoercableToConfigSchema] = None,
description: Optional[str] = None,
input_config_schema: Optional[CoercableToConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
) -> Callable[[InputLoadFn], InputManagerDefinition]:
...
[docs]def input_manager(
config_schema: Union[InputLoadFn, Optional[CoercableToConfigSchema]] = None,
description: Optional[str] = None,
input_config_schema: Optional[CoercableToConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
) -> Union[InputManagerDefinition, Callable[[InputLoadFn], InputManagerDefinition]]:
"""Define an input manager.
Input managers load op inputs, either from upstream outputs or by providing default values.
The decorated function should accept a :py:class:`InputContext` and resource config, and return
a loaded object that will be passed into one of the inputs of an op.
The decorator produces an :py:class:`InputManagerDefinition`.
Args:
config_schema (Optional[ConfigSchema]): The schema for the resource-level config. If not
set, Dagster will accept any config provided.
description (Optional[str]): A human-readable description of the resource.
input_config_schema (Optional[ConfigSchema]): A schema for the input-level config. Each
input that uses this input manager can be configured separately using this config.
If not set, Dagster will accept any config provided.
required_resource_keys (Optional[Set[str]]): Keys for the resources required by the input
manager.
version (Optional[str]): (Experimental) the version of the input manager definition.
**Examples:**
.. code-block:: python
from dagster import input_manager, op, job, In
@input_manager
def csv_loader(_):
return read_csv("some/path")
@op(ins={"input1": In(input_manager_key="csv_loader_key")})
def my_op(_, input1):
do_stuff(input1)
@job(resource_defs={"csv_loader_key": csv_loader})
def my_job():
my_op()
@input_manager(config_schema={"base_dir": str})
def csv_loader(context):
return read_csv(context.resource_config["base_dir"] + "/some/path")
@input_manager(input_config_schema={"path": str})
def csv_loader(context):
return read_csv(context.config["path"])
"""
if _is_input_load_fn(config_schema):
return _InputManagerDecoratorCallable()(config_schema)
config_schema = cast(Optional[CoercableToConfigSchema], config_schema)
def _wrap(load_fn: InputLoadFn) -> InputManagerDefinition:
return _InputManagerDecoratorCallable(
config_schema=config_schema,
description=description,
version=version,
input_config_schema=input_config_schema,
required_resource_keys=required_resource_keys,
)(load_fn)
return _wrap
def _is_input_load_fn(obj: Union[InputLoadFn, CoercableToConfigSchema]) -> TypeGuard[InputLoadFn]:
return callable(obj) and not is_callable_valid_config_arg(obj)
class InputManagerWrapper(InputManager):
def __init__(self, load_fn: InputLoadFn):
self._load_fn = load_fn
def load_input(self, context: "InputContext") -> object:
# the @input_manager decorated function (self._load_fn) may return a direct value that
# should be used or an instance of an InputManager. So we call self._load_fn and see if the
# result is an InputManager. If so we call it's load_input method
intermediate = (
# type-ignore because function being used as attribute
self._load_fn(context)
if has_at_least_one_parameter(self._load_fn)
else self._load_fn() # type: ignore # (strict type guard)
)
if isinstance(intermediate, InputManager):
return intermediate.load_input(context)
return intermediate
class _InputManagerDecoratorCallable:
def __init__(
self,
config_schema: CoercableToConfigSchema = None,
description: Optional[str] = None,
version: Optional[str] = None,
input_config_schema: CoercableToConfigSchema = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
):
self.config_schema = config_schema
self.description = check.opt_str_param(description, "description")
self.version = check.opt_str_param(version, "version")
self.input_config_schema = input_config_schema
self.required_resource_keys = required_resource_keys
def __call__(self, load_fn: InputLoadFn) -> InputManagerDefinition:
check.callable_param(load_fn, "load_fn")
def _resource_fn(_):
return InputManagerWrapper(load_fn)
input_manager_def = InputManagerDefinition(
resource_fn=_resource_fn,
config_schema=self.config_schema,
description=self.description,
version=self.version,
input_config_schema=self.input_config_schema,
required_resource_keys=self.required_resource_keys,
)
# `update_wrapper` typing cannot currently handle a Union of Callables correctly
update_wrapper(input_manager_def, wrapped=load_fn) # type: ignore
return input_manager_def