This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import Callable, Dict, Mapping, NamedTuple, Optional, Set, cast
import pendulum
import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
FreshnessPolicySensorExecutionError,
user_code_error_boundary,
)
from dagster._core.instance import DagsterInstance
from dagster._serdes import (
serialize_value,
whitelist_for_serdes,
)
from dagster._serdes.errors import DeserializationError
from dagster._serdes.serdes import deserialize_value
from dagster._seven import JSONDecodeError
from .sensor_definition import (
DefaultSensorStatus,
SensorDefinition,
SensorEvaluationContext,
SensorType,
SkipReason,
get_context_param_name,
get_sensor_context_from_args_or_kwargs,
validate_and_get_resource_dict,
)
@whitelist_for_serdes
class FreshnessPolicySensorCursor(
NamedTuple(
"_FreshnessPolicySensorCursor",
[("minutes_late_by_key_str", Mapping[str, Optional[float]])],
)
):
def __new__(cls, minutes_late_by_key_str: Mapping[str, Optional[float]]):
return super(FreshnessPolicySensorCursor, cls).__new__(
cls,
minutes_late_by_key_str=check.mapping_param(
minutes_late_by_key_str, "minutes_late_by_key_str", key_type=str
),
)
@staticmethod
def is_valid(json_str: str) -> bool:
try:
deserialize_value(json_str, FreshnessPolicySensorCursor)
return True
except (JSONDecodeError, DeserializationError):
return False
@staticmethod
def from_dict(
minutes_late_by_key: Mapping[AssetKey, Optional[float]]
) -> "FreshnessPolicySensorCursor":
return FreshnessPolicySensorCursor(
minutes_late_by_key_str={k.to_user_string(): v for k, v in minutes_late_by_key.items()}
)
@property
def minutes_late_by_key(self) -> Mapping[AssetKey, Optional[float]]:
return {AssetKey.from_user_string(k): v for k, v in self.minutes_late_by_key_str.items()}
def to_json(self) -> str:
return serialize_value(cast(NamedTuple, self))
@staticmethod
def from_json(json_str: str) -> "FreshnessPolicySensorCursor":
return deserialize_value(json_str, FreshnessPolicySensorCursor)
[docs]class FreshnessPolicySensorContext(
NamedTuple(
"_FreshnessPolicySensorContext",
[
("sensor_name", PublicAttr[str]),
("asset_key", PublicAttr[AssetKey]),
("freshness_policy", PublicAttr[FreshnessPolicy]),
("minutes_overdue", PublicAttr[Optional[float]]),
("previous_minutes_overdue", PublicAttr[Optional[float]]),
("instance", PublicAttr[DagsterInstance]),
("resources", Resources),
],
)
):
"""The ``context`` object available to a decorated function of ``freshness_policy_sensor``.
Attributes:
sensor_name (str): the name of the sensor.
asset_key (AssetKey): the key of the asset being monitored
freshness_policy (FreshnessPolicy): the freshness policy of the asset being monitored
minutes_overdue (Optional[float])
previous_minutes_overdue (Optional[float]): the minutes_overdue value for this asset on the
previous sensor tick.
instance (DagsterInstance): the current instance.
"""
def __new__(
cls,
sensor_name: str,
asset_key: AssetKey,
freshness_policy: FreshnessPolicy,
minutes_overdue: Optional[float],
previous_minutes_overdue: Optional[float],
instance: DagsterInstance,
resources: Optional[Resources] = None,
):
minutes_overdue = check.opt_numeric_param(minutes_overdue, "minutes_overdue")
previous_minutes_overdue = check.opt_numeric_param(
previous_minutes_overdue, "previous_minutes_overdue"
)
return super(FreshnessPolicySensorContext, cls).__new__(
cls,
sensor_name=check.str_param(sensor_name, "sensor_name"),
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
freshness_policy=check.inst_param(freshness_policy, "FreshnessPolicy", FreshnessPolicy),
minutes_overdue=float(minutes_overdue) if minutes_overdue is not None else None,
previous_minutes_overdue=(
float(previous_minutes_overdue) if previous_minutes_overdue is not None else None
),
instance=check.inst_param(instance, "instance", DagsterInstance),
resources=resources or ScopedResourcesBuilder.build_empty(),
)
[docs]@experimental
def build_freshness_policy_sensor_context(
sensor_name: str,
asset_key: AssetKey,
freshness_policy: FreshnessPolicy,
minutes_overdue: Optional[float],
previous_minutes_overdue: Optional[float] = None,
instance: Optional[DagsterInstance] = None,
resources: Optional[Resources] = None,
) -> FreshnessPolicySensorContext:
"""Builds freshness policy sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function
decorated with `@freshness_policy_sensor`, such as when writing unit tests.
Args:
sensor_name (str): The name of the sensor the context is being constructed for.
asset_key (AssetKey): The AssetKey for the monitored asset
freshness_policy (FreshnessPolicy): The FreshnessPolicy for the monitored asset
minutes_overdue (Optional[float]): How overdue the monitored asset currently is
previous_minutes_overdue (Optional[float]): How overdue the monitored asset was on the
previous tick.
instance (DagsterInstance): The dagster instance configured for the context.
Examples:
.. code-block:: python
context = build_freshness_policy_sensor_context(
sensor_name="freshness_policy_sensor_to_invoke",
asset_key=AssetKey("some_asset"),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)<
minutes_overdue=10.0,
)
freshness_policy_sensor_to_invoke(context)
"""
return FreshnessPolicySensorContext(
sensor_name=sensor_name,
asset_key=asset_key,
freshness_policy=freshness_policy,
minutes_overdue=minutes_overdue,
previous_minutes_overdue=previous_minutes_overdue,
instance=instance or DagsterInstance.ephemeral(),
resources=resources,
)
[docs]class FreshnessPolicySensorDefinition(SensorDefinition):
"""Define a sensor that reacts to the status of a given set of asset freshness policies,
where the decorated function will be evaluated on every sensor tick.
Args:
name (str): The name of the sensor. Defaults to the name of the decorated function.
freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core
evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
asset_selection (AssetSelection): The asset selection monitored by the sensor.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from the Dagster UI or via the GraphQL API.
"""
def __init__(
self,
name: str,
asset_selection: AssetSelection,
freshness_policy_sensor_fn: Callable[..., None],
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
required_resource_keys: Optional[Set[str]] = None,
):
check.str_param(name, "name")
check.inst_param(asset_selection, "asset_selection", AssetSelection)
check.opt_int_param(minimum_interval_seconds, "minimum_interval_seconds")
check.opt_str_param(description, "description")
check.inst_param(default_status, "default_status", DefaultSensorStatus)
self._freshness_policy_sensor_fn = check.callable_param(
freshness_policy_sensor_fn, "freshness_policy_sensor_fn"
)
resource_arg_names: Set[str] = {
arg.name for arg in get_resource_args(freshness_policy_sensor_fn)
}
combined_required_resource_keys = (
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
| resource_arg_names
)
def _wrapped_fn(context: SensorEvaluationContext):
from dagster._utils.caching_instance_queryer import (
CachingInstanceQueryer, # expensive import
)
if context.repository_def is None:
raise DagsterInvalidInvocationError(
"The `repository_def` property on the `SensorEvaluationContext` passed into a "
"`FreshnessPolicySensorDefinition` must not be None."
)
if context.cursor is None or not FreshnessPolicySensorCursor.is_valid(context.cursor):
new_cursor = FreshnessPolicySensorCursor({})
context.update_cursor(new_cursor.to_json())
yield SkipReason(f"Initializing {name}.")
return
evaluation_time = pendulum.now("UTC")
asset_graph = context.repository_def.asset_graph
instance_queryer = CachingInstanceQueryer(
context.instance, asset_graph, evaluation_time
)
data_time_resolver = CachingDataTimeResolver(instance_queryer=instance_queryer)
monitored_keys = asset_selection.resolve(asset_graph)
# get the previous status from the cursor
previous_minutes_late_by_key = FreshnessPolicySensorCursor.from_json(
context.cursor
).minutes_late_by_key
minutes_late_by_key: Dict[AssetKey, Optional[float]] = {}
for asset_key in monitored_keys:
freshness_policy = asset_graph.freshness_policies_by_key.get(asset_key)
if freshness_policy is None:
continue
# get the current minutes_overdue value for this asset
result = data_time_resolver.get_minutes_overdue(
evaluation_time=evaluation_time,
asset_key=asset_key,
)
minutes_late_by_key[asset_key] = result.overdue_minutes if result else None
resource_args_populated = validate_and_get_resource_dict(
context.resources, name, resource_arg_names
)
context_param_name = get_context_param_name(freshness_policy_sensor_fn)
freshness_context = FreshnessPolicySensorContext(
sensor_name=name,
asset_key=asset_key,
freshness_policy=freshness_policy,
minutes_overdue=minutes_late_by_key[asset_key],
previous_minutes_overdue=previous_minutes_late_by_key.get(asset_key),
instance=context.instance,
resources=context.resources,
)
with user_code_error_boundary(
FreshnessPolicySensorExecutionError,
lambda: f'Error occurred during the execution of sensor "{name}".',
):
context_param = (
{context_param_name: freshness_context} if context_param_name else {}
)
result = freshness_policy_sensor_fn(
**context_param,
**resource_args_populated,
)
if result is not None:
raise DagsterInvalidDefinitionError(
"Functions decorated by `@freshness_policy_sensor` may not return or yield"
" a value."
)
context.update_cursor(
FreshnessPolicySensorCursor.from_dict(minutes_late_by_key).to_json()
)
super(FreshnessPolicySensorDefinition, self).__init__(
name=name,
evaluation_fn=_wrapped_fn,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
default_status=default_status,
required_resource_keys=combined_required_resource_keys,
)
def __call__(self, *args, **kwargs) -> None:
context_param_name = get_context_param_name(self._freshness_policy_sensor_fn)
sensor_context = get_sensor_context_from_args_or_kwargs(
self._freshness_policy_sensor_fn,
args,
kwargs,
context_type=FreshnessPolicySensorContext,
)
context_param = (
{context_param_name: sensor_context} if context_param_name and sensor_context else {}
)
resources = validate_and_get_resource_dict(
sensor_context.resources if sensor_context else ScopedResourcesBuilder.build_empty(),
self._name,
self._required_resource_keys,
)
return self._freshness_policy_sensor_fn(**context_param, **resources)
@property
def sensor_type(self) -> SensorType:
return SensorType.FRESHNESS_POLICY
[docs]@experimental
def freshness_policy_sensor(
asset_selection: AssetSelection,
*,
name: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
) -> Callable[[Callable[..., None]], FreshnessPolicySensorDefinition,]:
"""Define a sensor that reacts to the status of a given set of asset freshness policies, where the
decorated function will be evaluated on every tick for each asset in the selection that has a
FreshnessPolicy defined.
Note: returning or yielding a value from the annotated function will result in an error.
Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
Args:
asset_selection (AssetSelection): The asset selection monitored by the sensor.
name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function.
freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core
evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from the Dagster UI or via the GraphQL API.
"""
def inner(fn: Callable[..., None]) -> FreshnessPolicySensorDefinition:
check.callable_param(fn, "fn")
sensor_name = name or fn.__name__
return FreshnessPolicySensorDefinition(
name=sensor_name,
freshness_policy_sensor_fn=fn,
asset_selection=asset_selection,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
default_status=default_status,
)
return inner