You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.event_api

from datetime import datetime
from typing import Callable, Mapping, NamedTuple, Optional, Sequence, Union

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.events import AssetKey, AssetMaterialization, AssetObservation
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.events import DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._serdes import whitelist_for_serdes

EventHandlerFn: TypeAlias = Callable[[EventLogEntry, str], None]


[docs]class RunShardedEventsCursor(NamedTuple): """Pairs an id-based event log cursor with a timestamp-based run cursor, for improved performance on run-sharded event log storages (e.g. the default SqliteEventLogStorage). For run-sharded storages, the id field is ignored, since they may not be unique across shards. """ id: int run_updated_after: datetime
[docs]@whitelist_for_serdes class EventLogRecord(NamedTuple): """Internal representation of an event record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`. Users should not instantiate this class directly. """ storage_id: PublicAttr[int] event_log_entry: PublicAttr[EventLogEntry] @property def run_id(self) -> str: return self.event_log_entry.run_id @property def timestamp(self) -> float: return self.event_log_entry.timestamp @property def asset_key(self) -> Optional[AssetKey]: dagster_event = self.event_log_entry.dagster_event if dagster_event: return dagster_event.asset_key return None @property def partition_key(self) -> Optional[str]: dagster_event = self.event_log_entry.dagster_event if dagster_event: return dagster_event.partition return None @property def asset_materialization(self) -> Optional[AssetMaterialization]: return self.event_log_entry.asset_materialization @property def asset_observation(self) -> Optional[AssetObservation]: return self.event_log_entry.asset_observation
[docs]@whitelist_for_serdes class EventRecordsFilter( NamedTuple( "_EventRecordsFilter", [ ("event_type", DagsterEventType), ("asset_key", Optional[AssetKey]), ("asset_partitions", Optional[Sequence[str]]), ("after_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("before_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("after_timestamp", Optional[float]), ("before_timestamp", Optional[float]), ("storage_ids", Optional[Sequence[int]]), ("tags", Optional[Mapping[str, Union[str, Sequence[str]]]]), ], ) ): """Defines a set of filter fields for fetching a set of event log entries or event log records. Args: event_type (DagsterEventType): Filter argument for dagster event type asset_key (Optional[AssetKey]): Asset key for which to get asset materialization event entries / records. asset_partitions (Optional[List[str]]): Filter parameter such that only asset events with a partition value matching one of the provided values. Only valid when the `asset_key` parameter is provided. after_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that only records with storage_id greater than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) before_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that records with storage_id less than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) after_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp greater than the provided value are returned. before_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp less than the provided value are returned. """ def __new__( cls, event_type: DagsterEventType, asset_key: Optional[AssetKey] = None, asset_partitions: Optional[Sequence[str]] = None, after_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, before_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, after_timestamp: Optional[float] = None, before_timestamp: Optional[float] = None, storage_ids: Optional[Sequence[int]] = None, tags: Optional[Mapping[str, Union[str, Sequence[str]]]] = None, ): check.opt_sequence_param(asset_partitions, "asset_partitions", of_type=str) check.inst_param(event_type, "event_type", DagsterEventType) tags = check.opt_mapping_param(tags, "tags", key_type=str) if tags and event_type is not DagsterEventType.ASSET_MATERIALIZATION: raise DagsterInvalidInvocationError( "Can only filter by tags for asset materialization events" ) # type-ignores work around mypy type inference bug return super(EventRecordsFilter, cls).__new__( cls, event_type=event_type, asset_key=check.opt_inst_param(asset_key, "asset_key", AssetKey), asset_partitions=asset_partitions, after_cursor=check.opt_inst_param( after_cursor, "after_cursor", (int, RunShardedEventsCursor) ), before_cursor=check.opt_inst_param( before_cursor, "before_cursor", (int, RunShardedEventsCursor) ), after_timestamp=check.opt_float_param(after_timestamp, "after_timestamp"), before_timestamp=check.opt_float_param(before_timestamp, "before_timestamp"), storage_ids=check.opt_nullable_sequence_param(storage_ids, "storage_ids", of_type=int), tags=check.opt_mapping_param(tags, "tags", key_type=str), )