You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.event_log.base

import base64
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Iterable,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import dagster._check as check
from dagster._core.assets import AssetDetails
from dagster._core.definitions.events import AssetKey
from dagster._core.event_api import EventHandlerFn, EventLogRecord, EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.execution.stats import (
    RunStepKeyStatsSnapshot,
    build_run_stats_from_events,
    build_run_step_stats_from_events,
)
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
from dagster._core.storage.sql import AlembicVersion
from dagster._seven import json
from dagster._utils import PrintFn
from dagster._utils.concurrency import ConcurrencyClaimStatus, ConcurrencyKeyInfo

if TYPE_CHECKING:
    from dagster._core.events.log import EventLogEntry
    from dagster._core.storage.partition_status_cache import AssetStatusCacheValue


class EventLogConnection(NamedTuple):
    records: Sequence[EventLogRecord]
    cursor: str
    has_more: bool


class EventLogCursorType(Enum):
    OFFSET = "OFFSET"
    STORAGE_ID = "STORAGE_ID"


class EventLogCursor(NamedTuple):
    """Representation of an event record cursor, keeping track of the log query state."""

    cursor_type: EventLogCursorType
    value: int

    def is_offset_cursor(self) -> bool:
        return self.cursor_type == EventLogCursorType.OFFSET

    def is_id_cursor(self) -> bool:
        return self.cursor_type == EventLogCursorType.STORAGE_ID

    def offset(self) -> int:
        check.invariant(self.cursor_type == EventLogCursorType.OFFSET)
        return max(0, int(self.value))

    def storage_id(self) -> int:
        check.invariant(self.cursor_type == EventLogCursorType.STORAGE_ID)
        return int(self.value)

    def __str__(self) -> str:
        return self.to_string()

    def to_string(self) -> str:
        raw = json.dumps({"type": self.cursor_type.value, "value": self.value})
        return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8")

    @staticmethod
    def parse(cursor_str: str) -> "EventLogCursor":
        raw = json.loads(base64.b64decode(cursor_str).decode("utf-8"))
        return EventLogCursor(EventLogCursorType(raw["type"]), raw["value"])

    @staticmethod
    def from_offset(offset: int) -> "EventLogCursor":
        return EventLogCursor(EventLogCursorType.OFFSET, offset)

    @staticmethod
    def from_storage_id(storage_id: int) -> "EventLogCursor":
        return EventLogCursor(EventLogCursorType.STORAGE_ID, storage_id)


class AssetEntry(
    NamedTuple(
        "_AssetEntry",
        [
            ("asset_key", AssetKey),
            ("last_materialization_record", Optional[EventLogRecord]),
            ("last_run_id", Optional[str]),
            ("asset_details", Optional[AssetDetails]),
            ("cached_status", Optional["AssetStatusCacheValue"]),
        ],
    )
):
    def __new__(
        cls,
        asset_key: AssetKey,
        last_materialization_record: Optional[EventLogRecord] = None,
        last_run_id: Optional[str] = None,
        asset_details: Optional[AssetDetails] = None,
        cached_status: Optional["AssetStatusCacheValue"] = None,
    ):
        from dagster._core.storage.partition_status_cache import AssetStatusCacheValue

        return super(AssetEntry, cls).__new__(
            cls,
            asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
            last_materialization_record=check.opt_inst_param(
                last_materialization_record, "last_materialization_record", EventLogRecord
            ),
            last_run_id=check.opt_str_param(last_run_id, "last_run_id"),
            asset_details=check.opt_inst_param(asset_details, "asset_details", AssetDetails),
            cached_status=check.opt_inst_param(
                cached_status, "cached_status", AssetStatusCacheValue
            ),
        )

    @property
    def last_materialization(self) -> Optional["EventLogEntry"]:
        if self.last_materialization_record is None:
            return None
        return self.last_materialization_record.event_log_entry

    @property
    def last_materialization_storage_id(self) -> Optional[int]:
        if self.last_materialization_record is None:
            return None
        return self.last_materialization_record.storage_id


[docs]class AssetRecord(NamedTuple): """Internal representation of an asset record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`. Users should not invoke this class directly. """ storage_id: int asset_entry: AssetEntry
[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract base class for storing structured event logs from pipeline runs. Note that event log storages using SQL databases as backing stores should implement :py:class:`~dagster._core.storage.event_log.SqlEventLogStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagster-webserver`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ def get_logs_for_run( self, run_id: str, cursor: Optional[Union[str, int]] = None, of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ascending: bool = True, ) -> Sequence["EventLogEntry"]: """Get all of the logs corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[Union[str, int]]): Cursor value to track paginated queries. Legacy support for integer offset cursors. of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. limit (Optional[int]): Max number of records to return. """ if isinstance(cursor, int): cursor = EventLogCursor.from_offset(cursor + 1).to_string() records = self.get_records_for_run( run_id, cursor, of_type, limit, ascending=ascending ).records return [record.event_log_entry for record in records] @abstractmethod def get_records_for_run( self, run_id: str, cursor: Optional[str] = None, of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ascending: bool = True, ) -> EventLogConnection: """Get all of the event log records corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[str]): Cursor value to track paginated queries. of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. limit (Optional[int]): Max number of records to return. """ def get_stats_for_run(self, run_id: str) -> DagsterRunStatsSnapshot: """Get a summary of events that have ocurred in a run.""" return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id)) def get_step_stats_for_run( self, run_id: str, step_keys: Optional[Sequence[str]] = None ) -> Sequence[RunStepKeyStatsSnapshot]: """Get per-step stats for a pipeline run.""" logs = self.get_logs_for_run(run_id) if step_keys: logs = [ event for event in logs if event.is_dagster_event and event.get_dagster_event().step_key in step_keys ] return build_run_step_stats_from_events(run_id, logs) @abstractmethod def store_event(self, event: "EventLogEntry") -> None: """Store an event corresponding to a pipeline run. Args: event (EventLogEntry): The event to store. """ @abstractmethod def delete_events(self, run_id: str) -> None: """Remove events for a given run id.""" @abstractmethod def upgrade(self) -> None: """This method should perform any schema migrations necessary to bring an out-of-date instance of the storage up to date. """ @abstractmethod def reindex_events(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None: """Call this method to run any data migrations across the event_log tables.""" @abstractmethod def reindex_assets(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None: """Call this method to run any data migrations across the asset tables.""" @abstractmethod def wipe(self) -> None: """Clear the log storage.""" @abstractmethod def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> None: """Call this method to start watching.""" @abstractmethod def end_watch(self, run_id: str, handler: EventHandlerFn) -> None: """Call this method to stop watching.""" @property @abstractmethod def is_persistent(self) -> bool: """bool: Whether the storage is persistent.""" def dispose(self) -> None: """Explicit lifecycle management.""" def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None: """Allows for optimizing database connection / use in the context of a long lived webserver process.""" @abstractmethod def get_event_records( self, event_records_filter: EventRecordsFilter, limit: Optional[int] = None, ascending: bool = False, ) -> Sequence[EventLogRecord]: pass def supports_event_consumer_queries(self) -> bool: return False def get_logs_for_all_runs_by_log_id( self, after_cursor: int = -1, dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, limit: Optional[int] = None, ) -> Mapping[int, "EventLogEntry"]: """Get event records across all runs. Only supported for non sharded sql storage.""" raise NotImplementedError() def get_maximum_record_id(self) -> Optional[int]: """Get the current greatest record id in the event log. Only supported for non sharded sql storage.""" raise NotImplementedError() @abstractmethod def can_cache_asset_status_data(self) -> bool: pass @abstractmethod def wipe_asset_cached_status(self, asset_key: AssetKey) -> None: pass @abstractmethod def get_asset_records( self, asset_keys: Optional[Sequence[AssetKey]] = None ) -> Sequence[AssetRecord]: pass @abstractmethod def has_asset_key(self, asset_key: AssetKey) -> bool: pass @abstractmethod def all_asset_keys(self) -> Sequence[AssetKey]: pass @abstractmethod def update_asset_cached_status_data( self, asset_key: AssetKey, cache_values: "AssetStatusCacheValue" ) -> None: pass def get_asset_keys( self, prefix: Optional[Sequence[str]] = None, limit: Optional[int] = None, cursor: Optional[str] = None, ) -> Sequence[AssetKey]: # base implementation of get_asset_keys, using the existing `all_asset_keys` and doing the # filtering in-memory asset_keys = sorted(self.all_asset_keys(), key=str) if prefix: asset_keys = [ asset_key for asset_key in asset_keys if asset_key.path[: len(prefix)] == prefix ] if cursor: cursor_asset = AssetKey.from_db_string(cursor) if cursor_asset and cursor_asset in asset_keys: idx = asset_keys.index(cursor_asset) asset_keys = asset_keys[idx + 1 :] if limit: asset_keys = asset_keys[:limit] return asset_keys @abstractmethod def get_latest_materialization_events( self, asset_keys: Iterable[AssetKey] ) -> Mapping[AssetKey, Optional["EventLogEntry"]]: pass def supports_add_asset_event_tags(self) -> bool: return False def add_asset_event_tags( self, event_id: int, event_timestamp: float, asset_key: AssetKey, new_tags: Mapping[str, str], ) -> None: raise NotImplementedError() @abstractmethod def get_event_tags_for_asset( self, asset_key: AssetKey, filter_tags: Optional[Mapping[str, str]] = None, filter_event_id: Optional[int] = None, ) -> Sequence[Mapping[str, str]]: pass @abstractmethod def get_asset_run_ids(self, asset_key: AssetKey) -> Sequence[str]: pass @abstractmethod def wipe_asset(self, asset_key: AssetKey) -> None: """Remove asset index history from event log for given asset_key.""" @abstractmethod def get_materialization_count_by_partition( self, asset_keys: Sequence[AssetKey], after_cursor: Optional[int] = None ) -> Mapping[AssetKey, Mapping[str, int]]: pass @abstractmethod def get_latest_storage_id_by_partition( self, asset_key: AssetKey, event_type: DagsterEventType ) -> Mapping[str, int]: pass @abstractmethod def get_latest_tags_by_partition( self, asset_key: AssetKey, event_type: DagsterEventType, tag_keys: Sequence[str], asset_partitions: Optional[Sequence[str]] = None, before_cursor: Optional[int] = None, after_cursor: Optional[int] = None, ) -> Mapping[str, Mapping[str, str]]: pass @abstractmethod def get_latest_asset_partition_materialization_attempts_without_materializations( self, asset_key: AssetKey ) -> Mapping[str, Tuple[str, int]]: pass @abstractmethod def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: """Get the list of partition keys for a dynamic partitions definition.""" raise NotImplementedError() @abstractmethod def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: """Check if a dynamic partition exists.""" raise NotImplementedError() @abstractmethod def add_dynamic_partitions( self, partitions_def_name: str, partition_keys: Sequence[str] ) -> None: """Add a partition for the specified dynamic partitions definition.""" raise NotImplementedError() @abstractmethod def delete_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> None: """Delete a partition for the specified dynamic partitions definition.""" raise NotImplementedError() def alembic_version(self) -> Optional[AlembicVersion]: return None @property def is_run_sharded(self) -> bool: """Indicates that the EventLogStoarge is sharded.""" return False @property def supports_global_concurrency_limits(self) -> bool: """Indicates that the EventLogStorage supports global concurrency limits.""" return False @abstractmethod def set_concurrency_slots(self, concurrency_key: str, num: int) -> None: """Allocate concurrency slots for the given concurrency key.""" raise NotImplementedError() @abstractmethod def get_concurrency_keys(self) -> Set[str]: """Get the set of concurrency limited keys.""" raise NotImplementedError() @abstractmethod def get_concurrency_info(self, concurrency_key: str) -> ConcurrencyKeyInfo: """Get concurrency info for key.""" raise NotImplementedError() @abstractmethod def claim_concurrency_slot( self, concurrency_key: str, run_id: str, step_key: str, priority: Optional[int] = None ) -> ConcurrencyClaimStatus: """Claim concurrency slots for step.""" raise NotImplementedError() @abstractmethod def check_concurrency_claim( self, concurrency_key: str, run_id: str, step_key: str ) -> ConcurrencyClaimStatus: """Claim concurrency slots for step.""" raise NotImplementedError() @abstractmethod def get_concurrency_run_ids(self) -> Set[str]: """Get a list of run_ids that are occupying or waiting for a concurrency key slot.""" raise NotImplementedError() @abstractmethod def free_concurrency_slots_for_run(self, run_id: str) -> None: """Frees concurrency slots for a given run.""" raise NotImplementedError() @abstractmethod def free_concurrency_slot_for_step(self, run_id: str, step_key: str) -> None: """Frees concurrency slots for a given run/step.""" raise NotImplementedError()