You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.schedules.base

import abc
from typing import Mapping, Optional, Sequence, Set

from dagster import AssetKey
from dagster._core.definitions.auto_materialize_condition import AutoMaterializeAssetEvaluation
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.scheduler.instigation import (
    AutoMaterializeAssetEvaluationRecord,
    InstigatorState,
    InstigatorStatus,
    InstigatorTick,
    TickData,
    TickStatus,
)
from dagster._core.storage.sql import AlembicVersion
from dagster._utils import PrintFn


[docs]class ScheduleStorage(abc.ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract class for managing persistance of scheduler artifacts.""" @abc.abstractmethod def wipe(self) -> None: """Delete all schedules from storage.""" @abc.abstractmethod def all_instigator_state( self, repository_origin_id: Optional[str] = None, repository_selector_id: Optional[str] = None, instigator_type: Optional[InstigatorType] = None, instigator_statuses: Optional[Set[InstigatorStatus]] = None, ) -> Sequence[InstigatorState]: """Return all InstigationStates present in storage. Args: repository_origin_id (Optional[str]): The ExternalRepository target id to scope results to repository_selector_id (Optional[str]): The repository selector id to scope results to instigator_type (Optional[InstigatorType]): The InstigatorType to scope results to instigator_statuses (Optional[Set[InstigatorStatus]]): The InstigatorStatuses to scope results to """ @abc.abstractmethod def get_instigator_state(self, origin_id: str, selector_id: str) -> Optional[InstigatorState]: """Return the instigator state for the given id. Args: origin_id (str): The unique instigator identifier selector_id (str): The logical instigator identifier """ @abc.abstractmethod def add_instigator_state(self, state: InstigatorState) -> InstigatorState: """Add an instigator state to storage. Args: state (InstigatorState): The state to add """ @abc.abstractmethod def update_instigator_state(self, state: InstigatorState) -> InstigatorState: """Update an instigator state in storage. Args: state (InstigatorState): The state to update """ @abc.abstractmethod def delete_instigator_state(self, origin_id: str, selector_id: str) -> None: """Delete a state in storage. Args: origin_id (str): The id of the instigator target to delete selector_id (str): The logical instigator identifier """ @property def supports_batch_queries(self) -> bool: return False def get_batch_ticks( self, selector_ids: Sequence[str], limit: Optional[int] = None, statuses: Optional[Sequence[TickStatus]] = None, ) -> Mapping[str, Sequence[InstigatorTick]]: raise NotImplementedError() @abc.abstractmethod def get_ticks( self, origin_id: str, selector_id: str, before: Optional[float] = None, after: Optional[float] = None, limit: Optional[int] = None, statuses: Optional[Sequence[TickStatus]] = None, ) -> Sequence[InstigatorTick]: """Get the ticks for a given instigator. Args: origin_id (str): The id of the instigator target selector_id (str): The logical instigator identifier """ @abc.abstractmethod def create_tick(self, tick_data: TickData) -> InstigatorTick: """Add a tick to storage. Args: tick_data (TickData): The tick to add """ @abc.abstractmethod def update_tick(self, tick: InstigatorTick) -> InstigatorTick: """Update a tick already in storage. Args: tick (InstigatorTick): The tick to update """ @abc.abstractmethod def purge_ticks( self, origin_id: str, selector_id: str, before: float, tick_statuses: Optional[Sequence[TickStatus]] = None, ) -> None: """Wipe ticks for an instigator for a certain status and timestamp. Args: origin_id (str): The id of the instigator target to delete selector_id (str): The logical instigator identifier before (datetime): All ticks before this datetime will get purged tick_statuses (Optional[List[TickStatus]]): The tick statuses to wipe """ @property def supports_auto_materialize_asset_evaluations(self) -> bool: return True @abc.abstractmethod def add_auto_materialize_asset_evaluations( self, evaluation_id: int, asset_evaluations: Sequence[AutoMaterializeAssetEvaluation], ) -> None: """Add asset policy evaluations to storage.""" @abc.abstractmethod def get_auto_materialize_asset_evaluations( self, asset_key: AssetKey, limit: int, cursor: Optional[int] = None ) -> Sequence[AutoMaterializeAssetEvaluationRecord]: """Get the policy evaluations for a given asset. Args: asset_key (AssetKey): The asset key to query limit (Optional[int]): The maximum number of evaluations to return cursor (Optional[int]): The cursor to paginate from """ @abc.abstractmethod def purge_asset_evaluations(self, before: float) -> None: """Wipe evaluations before a certain timestamp. Args: before (datetime): All evaluations before this datetime will get purged """ @abc.abstractmethod def upgrade(self) -> None: """Perform any needed migrations.""" def migrate(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any required data migrations.""" def optimize(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any optional data migrations for optimized reads.""" def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None: """Allows for optimizing database connection / use in the context of a long lived webserver process.""" def alembic_version(self) -> Optional[AlembicVersion]: return None def dispose(self) -> None: """Explicit lifecycle management."""