You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.freshness_policy_sensor_definition

from typing import Callable, Dict, Mapping, NamedTuple, Optional, Set, cast

import pendulum

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidInvocationError,
    FreshnessPolicySensorExecutionError,
    user_code_error_boundary,
)
from dagster._core.instance import DagsterInstance
from dagster._serdes import (
    serialize_value,
    whitelist_for_serdes,
)
from dagster._serdes.errors import DeserializationError
from dagster._serdes.serdes import deserialize_value
from dagster._seven import JSONDecodeError

from .sensor_definition import (
    DefaultSensorStatus,
    SensorDefinition,
    SensorEvaluationContext,
    SensorType,
    SkipReason,
    get_context_param_name,
    get_sensor_context_from_args_or_kwargs,
    validate_and_get_resource_dict,
)


@whitelist_for_serdes
class FreshnessPolicySensorCursor(
    NamedTuple(
        "_FreshnessPolicySensorCursor",
        [("minutes_late_by_key_str", Mapping[str, Optional[float]])],
    )
):
    def __new__(cls, minutes_late_by_key_str: Mapping[str, Optional[float]]):
        return super(FreshnessPolicySensorCursor, cls).__new__(
            cls,
            minutes_late_by_key_str=check.mapping_param(
                minutes_late_by_key_str, "minutes_late_by_key_str", key_type=str
            ),
        )

    @staticmethod
    def is_valid(json_str: str) -> bool:
        try:
            deserialize_value(json_str, FreshnessPolicySensorCursor)
            return True
        except (JSONDecodeError, DeserializationError):
            return False

    @staticmethod
    def from_dict(
        minutes_late_by_key: Mapping[AssetKey, Optional[float]]
    ) -> "FreshnessPolicySensorCursor":
        return FreshnessPolicySensorCursor(
            minutes_late_by_key_str={k.to_user_string(): v for k, v in minutes_late_by_key.items()}
        )

    @property
    def minutes_late_by_key(self) -> Mapping[AssetKey, Optional[float]]:
        return {AssetKey.from_user_string(k): v for k, v in self.minutes_late_by_key_str.items()}

    def to_json(self) -> str:
        return serialize_value(cast(NamedTuple, self))

    @staticmethod
    def from_json(json_str: str) -> "FreshnessPolicySensorCursor":
        return deserialize_value(json_str, FreshnessPolicySensorCursor)


[docs]class FreshnessPolicySensorContext( NamedTuple( "_FreshnessPolicySensorContext", [ ("sensor_name", PublicAttr[str]), ("asset_key", PublicAttr[AssetKey]), ("freshness_policy", PublicAttr[FreshnessPolicy]), ("minutes_overdue", PublicAttr[Optional[float]]), ("previous_minutes_overdue", PublicAttr[Optional[float]]), ("instance", PublicAttr[DagsterInstance]), ("resources", Resources), ], ) ): """The ``context`` object available to a decorated function of ``freshness_policy_sensor``. Attributes: sensor_name (str): the name of the sensor. asset_key (AssetKey): the key of the asset being monitored freshness_policy (FreshnessPolicy): the freshness policy of the asset being monitored minutes_overdue (Optional[float]) previous_minutes_overdue (Optional[float]): the minutes_overdue value for this asset on the previous sensor tick. instance (DagsterInstance): the current instance. """ def __new__( cls, sensor_name: str, asset_key: AssetKey, freshness_policy: FreshnessPolicy, minutes_overdue: Optional[float], previous_minutes_overdue: Optional[float], instance: DagsterInstance, resources: Optional[Resources] = None, ): minutes_overdue = check.opt_numeric_param(minutes_overdue, "minutes_overdue") previous_minutes_overdue = check.opt_numeric_param( previous_minutes_overdue, "previous_minutes_overdue" ) return super(FreshnessPolicySensorContext, cls).__new__( cls, sensor_name=check.str_param(sensor_name, "sensor_name"), asset_key=check.inst_param(asset_key, "asset_key", AssetKey), freshness_policy=check.inst_param(freshness_policy, "FreshnessPolicy", FreshnessPolicy), minutes_overdue=float(minutes_overdue) if minutes_overdue is not None else None, previous_minutes_overdue=( float(previous_minutes_overdue) if previous_minutes_overdue is not None else None ), instance=check.inst_param(instance, "instance", DagsterInstance), resources=resources or ScopedResourcesBuilder.build_empty(), )
[docs]@experimental def build_freshness_policy_sensor_context( sensor_name: str, asset_key: AssetKey, freshness_policy: FreshnessPolicy, minutes_overdue: Optional[float], previous_minutes_overdue: Optional[float] = None, instance: Optional[DagsterInstance] = None, resources: Optional[Resources] = None, ) -> FreshnessPolicySensorContext: """Builds freshness policy sensor context from provided parameters. This function can be used to provide the context argument when directly invoking a function decorated with `@freshness_policy_sensor`, such as when writing unit tests. Args: sensor_name (str): The name of the sensor the context is being constructed for. asset_key (AssetKey): The AssetKey for the monitored asset freshness_policy (FreshnessPolicy): The FreshnessPolicy for the monitored asset minutes_overdue (Optional[float]): How overdue the monitored asset currently is previous_minutes_overdue (Optional[float]): How overdue the monitored asset was on the previous tick. instance (DagsterInstance): The dagster instance configured for the context. Examples: .. code-block:: python context = build_freshness_policy_sensor_context( sensor_name="freshness_policy_sensor_to_invoke", asset_key=AssetKey("some_asset"), freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)< minutes_overdue=10.0, ) freshness_policy_sensor_to_invoke(context) """ return FreshnessPolicySensorContext( sensor_name=sensor_name, asset_key=asset_key, freshness_policy=freshness_policy, minutes_overdue=minutes_overdue, previous_minutes_overdue=previous_minutes_overdue, instance=instance or DagsterInstance.ephemeral(), resources=resources, )
[docs]class FreshnessPolicySensorDefinition(SensorDefinition): """Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every sensor tick. Args: name (str): The name of the sensor. Defaults to the name of the decorated function. freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`. asset_selection (AssetSelection): The asset selection monitored by the sensor. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API. """ def __init__( self, name: str, asset_selection: AssetSelection, freshness_policy_sensor_fn: Callable[..., None], minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, required_resource_keys: Optional[Set[str]] = None, ): check.str_param(name, "name") check.inst_param(asset_selection, "asset_selection", AssetSelection) check.opt_int_param(minimum_interval_seconds, "minimum_interval_seconds") check.opt_str_param(description, "description") check.inst_param(default_status, "default_status", DefaultSensorStatus) self._freshness_policy_sensor_fn = check.callable_param( freshness_policy_sensor_fn, "freshness_policy_sensor_fn" ) resource_arg_names: Set[str] = { arg.name for arg in get_resource_args(freshness_policy_sensor_fn) } combined_required_resource_keys = ( check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) | resource_arg_names ) def _wrapped_fn(context: SensorEvaluationContext): from dagster._utils.caching_instance_queryer import ( CachingInstanceQueryer, # expensive import ) if context.repository_def is None: raise DagsterInvalidInvocationError( "The `repository_def` property on the `SensorEvaluationContext` passed into a " "`FreshnessPolicySensorDefinition` must not be None." ) if context.cursor is None or not FreshnessPolicySensorCursor.is_valid(context.cursor): new_cursor = FreshnessPolicySensorCursor({}) context.update_cursor(new_cursor.to_json()) yield SkipReason(f"Initializing {name}.") return evaluation_time = pendulum.now("UTC") asset_graph = context.repository_def.asset_graph instance_queryer = CachingInstanceQueryer( context.instance, asset_graph, evaluation_time ) data_time_resolver = CachingDataTimeResolver(instance_queryer=instance_queryer) monitored_keys = asset_selection.resolve(asset_graph) # get the previous status from the cursor previous_minutes_late_by_key = FreshnessPolicySensorCursor.from_json( context.cursor ).minutes_late_by_key minutes_late_by_key: Dict[AssetKey, Optional[float]] = {} for asset_key in monitored_keys: freshness_policy = asset_graph.freshness_policies_by_key.get(asset_key) if freshness_policy is None: continue # get the current minutes_overdue value for this asset result = data_time_resolver.get_minutes_overdue( evaluation_time=evaluation_time, asset_key=asset_key, ) minutes_late_by_key[asset_key] = result.overdue_minutes if result else None resource_args_populated = validate_and_get_resource_dict( context.resources, name, resource_arg_names ) context_param_name = get_context_param_name(freshness_policy_sensor_fn) freshness_context = FreshnessPolicySensorContext( sensor_name=name, asset_key=asset_key, freshness_policy=freshness_policy, minutes_overdue=minutes_late_by_key[asset_key], previous_minutes_overdue=previous_minutes_late_by_key.get(asset_key), instance=context.instance, resources=context.resources, ) with user_code_error_boundary( FreshnessPolicySensorExecutionError, lambda: f'Error occurred during the execution of sensor "{name}".', ): context_param = ( {context_param_name: freshness_context} if context_param_name else {} ) result = freshness_policy_sensor_fn( **context_param, **resource_args_populated, ) if result is not None: raise DagsterInvalidDefinitionError( "Functions decorated by `@freshness_policy_sensor` may not return or yield" " a value." ) context.update_cursor( FreshnessPolicySensorCursor.from_dict(minutes_late_by_key).to_json() ) super(FreshnessPolicySensorDefinition, self).__init__( name=name, evaluation_fn=_wrapped_fn, minimum_interval_seconds=minimum_interval_seconds, description=description, default_status=default_status, required_resource_keys=combined_required_resource_keys, ) def __call__(self, *args, **kwargs) -> None: context_param_name = get_context_param_name(self._freshness_policy_sensor_fn) sensor_context = get_sensor_context_from_args_or_kwargs( self._freshness_policy_sensor_fn, args, kwargs, context_type=FreshnessPolicySensorContext, ) context_param = ( {context_param_name: sensor_context} if context_param_name and sensor_context else {} ) resources = validate_and_get_resource_dict( sensor_context.resources if sensor_context else ScopedResourcesBuilder.build_empty(), self._name, self._required_resource_keys, ) return self._freshness_policy_sensor_fn(**context_param, **resources) @property def sensor_type(self) -> SensorType: return SensorType.FRESHNESS_POLICY
[docs]@experimental def freshness_policy_sensor( asset_selection: AssetSelection, *, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[[Callable[..., None]], FreshnessPolicySensorDefinition,]: """Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every tick for each asset in the selection that has a FreshnessPolicy defined. Note: returning or yielding a value from the annotated function will result in an error. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`. Args: asset_selection (AssetSelection): The asset selection monitored by the sensor. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]): The core evaluation function for the sensor. Takes a :py:class:`~dagster.FreshnessPolicySensorContext`. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API. """ def inner(fn: Callable[..., None]) -> FreshnessPolicySensorDefinition: check.callable_param(fn, "fn") sensor_name = name or fn.__name__ return FreshnessPolicySensorDefinition( name=sensor_name, freshness_policy_sensor_fn=fn, asset_selection=asset_selection, minimum_interval_seconds=minimum_interval_seconds, description=description, default_status=default_status, ) return inner