This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import inspect
from typing import Any, Callable, NamedTuple, Optional, Sequence, Set
import dagster._check as check
from dagster._annotations import public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.resource_annotation import get_resource_args
from .events import AssetKey
from .run_request import RunRequest, SkipReason
from .sensor_definition import (
DefaultSensorStatus,
RawSensorEvaluationFunctionReturn,
SensorDefinition,
SensorType,
validate_and_get_resource_dict,
)
from .target import ExecutableDefinition
from .utils import check_valid_name
class AssetSensorParamNames(NamedTuple):
context_param_name: Optional[str]
event_log_entry_param_name: Optional[str]
def get_asset_sensor_param_names(fn: Callable) -> AssetSensorParamNames:
"""Determines the names of the context and event log entry parameters for an asset sensor function.
These are assumed to be the first two non-resource params, in order (context param before event log entry).
"""
resource_params = {param.name for param in get_resource_args(fn)}
non_resource_params = [
param.name for param in get_function_params(fn) if param.name not in resource_params
]
context_param_name = non_resource_params[0] if len(non_resource_params) > 0 else None
event_log_entry_param_name = non_resource_params[1] if len(non_resource_params) > 1 else None
return AssetSensorParamNames(
context_param_name=context_param_name, event_log_entry_param_name=event_log_entry_param_name
)
[docs]class AssetSensorDefinition(SensorDefinition):
"""Define an asset sensor that initiates a set of runs based on the materialization of a given
asset.
Args:
name (str): The name of the sensor to create.
asset_key (AssetKey): The asset_key this sensor monitors.
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]): The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext` and
an EventLogEntry corresponding to an AssetMaterialization event.
This function must return a generator, which must yield either a single SkipReason
or one or more RunRequest objects.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job
object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]):
(experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from the Dagster UI or via the GraphQL API.
"""
def __init__(
self,
name: str,
asset_key: AssetKey,
job_name: Optional[str],
asset_materialization_fn: Callable[
...,
RawSensorEvaluationFunctionReturn,
],
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
required_resource_keys: Optional[Set[str]] = None,
):
self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey)
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter
resource_arg_names: Set[str] = {
arg.name for arg in get_resource_args(asset_materialization_fn)
}
combined_required_resource_keys = (
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
| resource_arg_names
)
def _wrap_asset_fn(materialization_fn) -> Any:
def _fn(context) -> Any:
after_cursor = None
if context.cursor:
try:
after_cursor = int(context.cursor)
except ValueError:
after_cursor = None
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=self._asset_key,
after_cursor=after_cursor,
),
ascending=False,
limit=1,
)
if not event_records:
yield SkipReason(
f"No new materialization events found for asset key {self._asset_key}"
)
return
event_record = event_records[0]
(
context_param_name,
event_log_entry_param_name,
) = get_asset_sensor_param_names(materialization_fn)
resource_args_populated = validate_and_get_resource_dict(
context.resources, name, resource_arg_names
)
# Build asset sensor function args, which can include any subset of
# context arg, event log entry arg, and any resource args
args = resource_args_populated
if context_param_name:
args[context_param_name] = context
if event_log_entry_param_name:
args[event_log_entry_param_name] = event_record.event_log_entry
result = materialization_fn(**args)
if inspect.isgenerator(result) or isinstance(result, list):
for item in result:
yield item
elif isinstance(result, (SkipReason, RunRequest)):
yield result
context.update_cursor(str(event_record.storage_id))
return _fn
super(AssetSensorDefinition, self).__init__(
name=check_valid_name(name),
job_name=job_name,
evaluation_fn=_wrap_asset_fn(
check.callable_param(asset_materialization_fn, "asset_materialization_fn"),
),
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
jobs=jobs,
default_status=default_status,
required_resource_keys=combined_required_resource_keys,
)
@public
@property
def asset_key(self) -> AssetKey:
"""AssetKey: The key of the asset targeted by this sensor."""
return self._asset_key
@property
def sensor_type(self) -> SensorType:
return SensorType.ASSET