This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import AbstractSet, Any, Callable, Iterator, NamedTuple, Optional, cast
import dagster._check as check
from dagster._annotations import PublicAttr
from ..decorator_utils import get_function_params
from ..errors import DagsterInvalidInvocationError
from .resource_requirement import HookResourceRequirement, RequiresResources, ResourceRequirement
from .utils import check_valid_name
[docs]class HookDefinition(
NamedTuple(
"_HookDefinition",
[
("name", PublicAttr[str]),
("hook_fn", PublicAttr[Callable]),
("required_resource_keys", PublicAttr[AbstractSet[str]]),
("decorated_fn", PublicAttr[Optional[Callable]]),
],
),
RequiresResources,
):
"""Define a hook which can be triggered during a op execution (e.g. a callback on the step
execution failure event during a op execution).
Args:
name (str): The name of this hook.
hook_fn (Callable): The callback function that will be triggered.
required_resource_keys (Optional[AbstractSet[str]]): Keys for the resources required by the
hook.
"""
def __new__(
cls,
*,
name: str,
hook_fn: Callable[..., Any],
required_resource_keys: Optional[AbstractSet[str]] = None,
decorated_fn: Optional[Callable[..., Any]] = None,
):
return super(HookDefinition, cls).__new__(
cls,
name=check_valid_name(name),
hook_fn=check.callable_param(hook_fn, "hook_fn"),
required_resource_keys=frozenset(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
),
decorated_fn=check.opt_callable_param(decorated_fn, "decorated_fn"),
)
def __call__(self, *args, **kwargs):
"""This is invoked when the hook is used as a decorator.
We currently support hooks to decorate the following:
- JobDefinition: when the hook decorates a job definition, it will be added to
all the op invocations within the job.
Example:
.. code-block:: python
@success_hook
def slack_message_on_success(_):
...
@slack_message_on_success
@job
def a_job():
foo(bar())
"""
from ..execution.context.hook import HookContext
from .graph_definition import GraphDefinition
from .hook_invocation import hook_invocation_result
from .job_definition import JobDefinition
if len(args) > 0 and isinstance(args[0], (JobDefinition, GraphDefinition)):
# when it decorates a job, we apply this hook to all the op invocations within
# the job.
return args[0].with_hooks({self})
else:
if not self.decorated_fn:
raise DagsterInvalidInvocationError(
"Only hook definitions created using one of the hook decorators can be invoked."
)
fxn_args = get_function_params(self.decorated_fn)
# If decorated fxn has two arguments, then this is an event list hook fxn, and parameter
# names are always context and event_list
if len(fxn_args) == 2:
context_arg_name = fxn_args[0].name
event_list_arg_name = fxn_args[1].name
if len(args) + len(kwargs) != 2:
raise DagsterInvalidInvocationError(
"Decorated function expects two parameters, context and event_list, but "
f"{len(args) + len(kwargs)} were provided."
)
if args:
context = check.opt_inst_param(args[0], "context", HookContext)
event_list = check.opt_list_param(
args[1] if len(args) > 1 else kwargs[event_list_arg_name],
event_list_arg_name,
)
else:
if context_arg_name not in kwargs:
raise DagsterInvalidInvocationError(
f"Could not find expected argument '{context_arg_name}'. Provided "
f"kwargs: {list(kwargs.keys())}"
)
if event_list_arg_name not in kwargs:
raise DagsterInvalidInvocationError(
f"Could not find expected argument '{event_list_arg_name}'. Provided "
f"kwargs: {list(kwargs.keys())}"
)
context = check.opt_inst_param(
kwargs[context_arg_name], context_arg_name, HookContext
)
event_list = check.opt_list_param(
kwargs[event_list_arg_name], event_list_arg_name
)
return hook_invocation_result(self, context, event_list)
else:
context_arg_name = fxn_args[0].name
if len(args) + len(kwargs) != 1:
raise DagsterInvalidInvocationError(
f"Decorated function expects one parameter, {context_arg_name}, but "
f"{len(args) + len(kwargs)} were provided."
)
if args:
context = check.opt_inst_param(args[0], context_arg_name, HookContext)
else:
if context_arg_name not in kwargs:
raise DagsterInvalidInvocationError(
f"Could not find expected argument '{context_arg_name}'. Provided "
f"kwargs: {list(kwargs.keys())}"
)
context = check.opt_inst_param(
kwargs[context_arg_name], context_arg_name, HookContext
)
return hook_invocation_result(self, context)
def get_resource_requirements(
self, outer_context: Optional[object] = None
) -> Iterator[ResourceRequirement]:
# outer_context in this case is a string of (job, job name) or (node, node name)
attached_to = cast(Optional[str], outer_context)
for resource_key in sorted(list(self.required_resource_keys)):
yield HookResourceRequirement(
key=resource_key, attached_to=attached_to, hook_name=self.name
)