You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.runs.base

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Mapping, Optional, Sequence, Set, Tuple, Union

from typing_extensions import TypedDict

from dagster._core.events import DagsterEvent
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot
from dagster._core.storage.dagster_run import (
    DagsterRun,
    JobBucket,
    RunPartitionData,
    RunRecord,
    RunsFilter,
    TagBucket,
)
from dagster._core.storage.sql import AlembicVersion
from dagster._daemon.types import DaemonHeartbeat
from dagster._utils import PrintFn

from ..daemon_cursor import DaemonCursorStorage

if TYPE_CHECKING:
    from dagster._core.host_representation.origin import ExternalJobOrigin


class RunGroupInfo(TypedDict):
    count: int
    runs: Sequence[DagsterRun]


[docs]class RunStorage(ABC, MayHaveInstanceWeakref[T_DagsterInstance], DaemonCursorStorage): """Abstract base class for storing pipeline run history. Note that run storages using SQL databases as backing stores should implement :py:class:`~dagster._core.storage.runs.SqlRunStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagster-webserver`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ @abstractmethod def add_run(self, dagster_run: DagsterRun) -> DagsterRun: """Add a run to storage. If a run already exists with the same ID, raise DagsterRunAlreadyExists If the run's snapshot ID does not exist raise DagsterSnapshotDoesNotExist Args: dagster_run (DagsterRun): The run to add. """ @abstractmethod def handle_run_event(self, run_id: str, event: DagsterEvent) -> None: """Update run storage in accordance to a pipeline run related DagsterEvent. Args: run_id (str) event (DagsterEvent) """ @abstractmethod def get_runs( self, filters: Optional[RunsFilter] = None, cursor: Optional[str] = None, limit: Optional[int] = None, bucket_by: Optional[Union[JobBucket, TagBucket]] = None, ) -> Sequence[DagsterRun]: """Return all the runs present in the storage that match the given filters. Args: filters (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.RunsFilter` by which to filter runs cursor (Optional[str]): Starting cursor (run_id) of range of runs limit (Optional[int]): Number of results to get. Defaults to infinite. Returns: List[PipelineRun] """ @abstractmethod def get_run_ids( self, filters: Optional[RunsFilter] = None, cursor: Optional[str] = None, limit: Optional[int] = None, ) -> Sequence[str]: """Return all the run IDs for runs present in the storage that match the given filters. Args: filters (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.RunsFilter` by which to filter runs cursor (Optional[str]): Starting cursor (run_id) of range of runs limit (Optional[int]): Number of results to get. Defaults to infinite. Returns: Sequence[str] """ @abstractmethod def get_runs_count(self, filters: Optional[RunsFilter] = None) -> int: """Return the number of runs present in the storage that match the given filters. Args: filters (Optional[RunsFilter]) -- The :py:class:`~dagster._core.storage.pipeline_run.PipelineRunFilter` by which to filter runs Returns: int: The number of runs that match the given filters. """ @abstractmethod def get_run_group(self, run_id: str) -> Optional[Tuple[str, Sequence[DagsterRun]]]: """Get the run group to which a given run belongs. Args: run_id (str): If the corresponding run is the descendant of some root run (i.e., there is a root_run_id on the :py:class:`PipelineRun`), that root run and all of its descendants are returned; otherwise, the group will consist only of the given run (a run that does not descend from any root is its own root). Returns: Optional[Tuple[string, List[PipelineRun]]]: If there is a corresponding run group, tuple whose first element is the root_run_id and whose second element is a list of all the descendent runs. Otherwise `None`. """ @abstractmethod def get_run_records( self, filters: Optional[RunsFilter] = None, limit: Optional[int] = None, order_by: Optional[str] = None, ascending: bool = False, cursor: Optional[str] = None, bucket_by: Optional[Union[JobBucket, TagBucket]] = None, ) -> Sequence[RunRecord]: """Return a list of run records stored in the run storage, sorted by the given column in given order. Args: filters (Optional[RunsFilter]): the filter by which to filter runs. limit (Optional[int]): Number of results to get. Defaults to infinite. order_by (Optional[str]): Name of the column to sort by. Defaults to id. ascending (Optional[bool]): Sort the result in ascending order if True, descending otherwise. Defaults to descending. Returns: List[RunRecord]: List of run records stored in the run storage. """ @abstractmethod def get_run_tags( self, tag_keys: Optional[Sequence[str]] = None, value_prefix: Optional[str] = None, limit: Optional[int] = None, ) -> Sequence[Tuple[str, Set[str]]]: """Get a list of tag keys and the values that have been associated with them. Args: tag_keys (Optional[Sequence[str]]): tag keys to filter by. Returns: List[Tuple[str, Set[str]]] """ @abstractmethod def get_run_tag_keys(self) -> Sequence[str]: """Get a list of tag keys. Returns: List[str] """ @abstractmethod def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]) -> None: """Add additional tags for a pipeline run. Args: run_id (str) new_tags (Dict[string, string]) """ @abstractmethod def has_run(self, run_id: str) -> bool: """Check if the storage contains a run. Args: run_id (str): The id of the run Returns: bool """ def add_snapshot( self, snapshot: Union[JobSnapshot, ExecutionPlanSnapshot], snapshot_id: Optional[str] = None, ) -> None: """Add a snapshot to the storage. Args: snapshot (Union[PipelineSnapshot, ExecutionPlanSnapshot]) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. """ if isinstance(snapshot, JobSnapshot): self.add_job_snapshot(snapshot, snapshot_id) else: self.add_execution_plan_snapshot(snapshot, snapshot_id) def has_snapshot(self, snapshot_id: str): return self.has_job_snapshot(snapshot_id) or self.has_execution_plan_snapshot(snapshot_id) @abstractmethod def has_job_snapshot(self, job_snapshot_id: str) -> bool: """Check to see if storage contains a pipeline snapshot. Args: pipeline_snapshot_id (str): The id of the run. Returns: bool """ @abstractmethod def add_job_snapshot(self, job_snapshot: JobSnapshot, snapshot_id: Optional[str] = None) -> str: """Add a pipeline snapshot to the run store. Pipeline snapshots are content-addressable, meaning that the ID for a snapshot is a hash based on the body of the snapshot. This function returns that snapshot ID. Args: job_snapshot (PipelineSnapshot) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. Return: str: The job_snapshot_id """ @abstractmethod def get_job_snapshot(self, job_snapshot_id: str) -> JobSnapshot: """Fetch a snapshot by ID. Args: job_snapshot_id (str) Returns: PipelineSnapshot """ @abstractmethod def has_execution_plan_snapshot(self, execution_plan_snapshot_id: str) -> bool: """Check to see if storage contains an execution plan snapshot. Args: execution_plan_snapshot_id (str): The id of the execution plan. Returns: bool """ @abstractmethod def add_execution_plan_snapshot( self, execution_plan_snapshot: ExecutionPlanSnapshot, snapshot_id: Optional[str] = None ) -> str: """Add an execution plan snapshot to the run store. Execution plan snapshots are content-addressable, meaning that the ID for a snapshot is a hash based on the body of the snapshot. This function returns that snapshot ID. Args: execution_plan_snapshot (ExecutionPlanSnapshot) snapshot_id (Optional[str]): [Internal] The id of the snapshot. If not provided, the snapshot id will be generated from a hash of the snapshot. This should only be used in debugging, where we might want to import a historical run whose snapshots were calculated using a different hash function than the current code. Return: str: The execution_plan_snapshot_id """ @abstractmethod def get_execution_plan_snapshot(self, execution_plan_snapshot_id: str) -> ExecutionPlanSnapshot: """Fetch a snapshot by ID. Args: execution_plan_snapshot_id (str) Returns: ExecutionPlanSnapshot """ @abstractmethod def wipe(self) -> None: """Clears the run storage.""" @abstractmethod def delete_run(self, run_id: str) -> None: """Remove a run from storage.""" @property def supports_bucket_queries(self) -> bool: return False @abstractmethod def get_run_partition_data(self, runs_filter: RunsFilter) -> Sequence[RunPartitionData]: """Get run partition data for a given partitioned job.""" def migrate(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any required data migrations.""" def optimize(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any optional data migrations for optimized reads.""" def dispose(self) -> None: """Explicit lifecycle management.""" def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None: """Allows for optimizing database connection / use in the context of a long lived webserver process.""" # Daemon Heartbeat Storage # # Holds heartbeats from the Dagster Daemon so that other system components can alert when it's not # alive. # This is temporarily placed along with run storage to avoid adding a new instance concept. It # should be split out once all metadata storages are configured together. @abstractmethod def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat) -> None: """Called on a regular interval by the daemon.""" @abstractmethod def get_daemon_heartbeats(self) -> Mapping[str, DaemonHeartbeat]: """Latest heartbeats of all daemon types.""" @abstractmethod def wipe_daemon_heartbeats(self) -> None: """Wipe all daemon heartbeats.""" # Backfill storage @abstractmethod def get_backfills( self, status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, ) -> Sequence[PartitionBackfill]: """Get a list of partition backfills.""" @abstractmethod def get_backfill(self, backfill_id: str) -> Optional[PartitionBackfill]: """Get the partition backfill of the given backfill id.""" @abstractmethod def add_backfill(self, partition_backfill: PartitionBackfill): """Add partition backfill to run storage.""" @abstractmethod def update_backfill(self, partition_backfill: PartitionBackfill): """Update a partition backfill in run storage.""" def alembic_version(self) -> Optional[AlembicVersion]: return None @abstractmethod def replace_job_origin(self, run: "DagsterRun", job_origin: "ExternalJobOrigin") -> None: ...