You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.events

"""Structured representations of system events."""
import logging
import os
import sys
from enum import Enum
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Dict,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Union,
    cast,
)

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import (
    AssetKey,
    AssetMaterialization,
    AssetObservation,
    ExpectationResult,
    HookDefinition,
    NodeHandle,
)
from dagster._core.definitions.events import AssetLineageInfo, ObjectStoreOperationType
from dagster._core.definitions.metadata import (
    MetadataFieldSerializer,
    MetadataValue,
    RawMetadataValue,
    normalize_metadata,
)
from dagster._core.errors import HookExecutionError
from dagster._core.execution.context.system import IPlanContext, IStepContext, StepExecutionContext
from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster._core.execution.plan.inputs import StepInputData
from dagster._core.execution.plan.objects import StepFailureData, StepRetryData, StepSuccessData
from dagster._core.execution.plan.outputs import StepOutputData
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.captured_log_manager import CapturedLogContext
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._serdes import (
    NamedTupleSerializer,
    whitelist_for_serdes,
)
from dagster._serdes.serdes import UnpackContext
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster._utils.timing import format_duration

if TYPE_CHECKING:
    from dagster._core.definitions.events import ObjectStoreOperation
    from dagster._core.execution.plan.plan import ExecutionPlan
    from dagster._core.execution.plan.step import StepKind

EventSpecificData = Union[
    StepOutputData,
    StepFailureData,
    StepSuccessData,
    "StepMaterializationData",
    "StepExpectationResultData",
    StepInputData,
    "EngineEventData",
    "HookErroredData",
    StepRetryData,
    "JobFailureData",
    "JobCanceledData",
    "ObjectStoreOperationResultData",
    "HandledOutputData",
    "LoadedInputData",
    "ComputeLogsCaptureData",
    "AssetObservationData",
    "AssetMaterializationPlannedData",
]


[docs]class DagsterEventType(str, Enum): """The types of events that may be yielded by op and job execution.""" STEP_OUTPUT = "STEP_OUTPUT" STEP_INPUT = "STEP_INPUT" STEP_FAILURE = "STEP_FAILURE" STEP_START = "STEP_START" STEP_SUCCESS = "STEP_SUCCESS" STEP_SKIPPED = "STEP_SKIPPED" # The process carrying out step execution is starting/started. Shown as a # marker start/end in the Dagster UI. STEP_WORKER_STARTING = "STEP_WORKER_STARTING" STEP_WORKER_STARTED = "STEP_WORKER_STARTED" # Resource initialization for execution has started/succeede/failed. Shown # as a marker start/end in the Dagster UI. RESOURCE_INIT_STARTED = "RESOURCE_INIT_STARTED" RESOURCE_INIT_SUCCESS = "RESOURCE_INIT_SUCCESS" RESOURCE_INIT_FAILURE = "RESOURCE_INIT_FAILURE" STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry STEP_RESTARTED = "STEP_RESTARTED" ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" ASSET_MATERIALIZATION_PLANNED = "ASSET_MATERIALIZATION_PLANNED" ASSET_OBSERVATION = "ASSET_OBSERVATION" STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" # We want to display RUN_* events in the Dagster UI and in our LogManager output, but in order to # support backcompat for our storage layer, we need to keep the persisted value to be strings # of the form "PIPELINE_*". We may have user code that pass in the DagsterEventType # enum values into storage APIs (like get_event_records, which takes in an EventRecordsFilter). RUN_ENQUEUED = "PIPELINE_ENQUEUED" RUN_DEQUEUED = "PIPELINE_DEQUEUED" RUN_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet RUN_START = "PIPELINE_START" # Execution has started RUN_SUCCESS = "PIPELINE_SUCCESS" RUN_FAILURE = "PIPELINE_FAILURE" RUN_CANCELING = "PIPELINE_CANCELING" RUN_CANCELED = "PIPELINE_CANCELED" # Keep these legacy enum values around, to keep back-compatability for user code that might be # using these constants to filter event records PIPELINE_ENQUEUED = RUN_ENQUEUED PIPELINE_DEQUEUED = RUN_DEQUEUED PIPELINE_STARTING = RUN_STARTING PIPELINE_START = RUN_START PIPELINE_SUCCESS = RUN_SUCCESS PIPELINE_FAILURE = RUN_FAILURE PIPELINE_CANCELING = RUN_CANCELING PIPELINE_CANCELED = RUN_CANCELED OBJECT_STORE_OPERATION = "OBJECT_STORE_OPERATION" ASSET_STORE_OPERATION = "ASSET_STORE_OPERATION" LOADED_INPUT = "LOADED_INPUT" HANDLED_OUTPUT = "HANDLED_OUTPUT" ENGINE_EVENT = "ENGINE_EVENT" HOOK_COMPLETED = "HOOK_COMPLETED" HOOK_ERRORED = "HOOK_ERRORED" HOOK_SKIPPED = "HOOK_SKIPPED" ALERT_START = "ALERT_START" ALERT_SUCCESS = "ALERT_SUCCESS" ALERT_FAILURE = "ALERT_FAILURE" LOGS_CAPTURED = "LOGS_CAPTURED"
EVENT_TYPE_VALUE_TO_DISPLAY_STRING = { "PIPELINE_ENQUEUED": "RUN_ENQUEUED", "PIPELINE_DEQUEUED": "RUN_DEQUEUED", "PIPELINE_STARTING": "RUN_STARTING", "PIPELINE_START": "RUN_START", "PIPELINE_SUCCESS": "RUN_SUCCESS", "PIPELINE_FAILURE": "RUN_FAILURE", "PIPELINE_CANCELING": "RUN_CANCELING", "PIPELINE_CANCELED": "RUN_CANCELED", } STEP_EVENTS = { DagsterEventType.STEP_INPUT, DagsterEventType.STEP_START, DagsterEventType.STEP_OUTPUT, DagsterEventType.STEP_FAILURE, DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SKIPPED, DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.STEP_EXPECTATION_RESULT, DagsterEventType.OBJECT_STORE_OPERATION, DagsterEventType.HANDLED_OUTPUT, DagsterEventType.LOADED_INPUT, DagsterEventType.STEP_RESTARTED, DagsterEventType.STEP_UP_FOR_RETRY, } FAILURE_EVENTS = { DagsterEventType.RUN_FAILURE, DagsterEventType.STEP_FAILURE, DagsterEventType.RUN_CANCELED, } PIPELINE_EVENTS = { DagsterEventType.RUN_ENQUEUED, DagsterEventType.RUN_DEQUEUED, DagsterEventType.RUN_STARTING, DagsterEventType.RUN_START, DagsterEventType.RUN_SUCCESS, DagsterEventType.RUN_FAILURE, DagsterEventType.RUN_CANCELING, DagsterEventType.RUN_CANCELED, } HOOK_EVENTS = { DagsterEventType.HOOK_COMPLETED, DagsterEventType.HOOK_ERRORED, DagsterEventType.HOOK_SKIPPED, } ALERT_EVENTS = { DagsterEventType.ALERT_START, DagsterEventType.ALERT_SUCCESS, DagsterEventType.ALERT_FAILURE, } MARKER_EVENTS = { DagsterEventType.ENGINE_EVENT, DagsterEventType.STEP_WORKER_STARTING, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, } EVENT_TYPE_TO_PIPELINE_RUN_STATUS = { DagsterEventType.RUN_START: DagsterRunStatus.STARTED, DagsterEventType.RUN_SUCCESS: DagsterRunStatus.SUCCESS, DagsterEventType.RUN_FAILURE: DagsterRunStatus.FAILURE, DagsterEventType.RUN_ENQUEUED: DagsterRunStatus.QUEUED, DagsterEventType.RUN_STARTING: DagsterRunStatus.STARTING, DagsterEventType.RUN_CANCELING: DagsterRunStatus.CANCELING, DagsterEventType.RUN_CANCELED: DagsterRunStatus.CANCELED, } PIPELINE_RUN_STATUS_TO_EVENT_TYPE = {v: k for k, v in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.items()} ASSET_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, } def _assert_type( method: str, expected_type: Union[DagsterEventType, Sequence[DagsterEventType]], actual_type: DagsterEventType, ) -> None: _expected_type = ( [expected_type] if isinstance(expected_type, DagsterEventType) else expected_type ) check.invariant( actual_type in _expected_type, f"{method} only callable when event_type is" f" {','.join([t.value for t in _expected_type])}, called on {actual_type}", ) def _validate_event_specific_data( event_type: DagsterEventType, event_specific_data: Optional["EventSpecificData"] ) -> Optional["EventSpecificData"]: if event_type == DagsterEventType.STEP_OUTPUT: check.inst_param(event_specific_data, "event_specific_data", StepOutputData) elif event_type == DagsterEventType.STEP_FAILURE: check.inst_param(event_specific_data, "event_specific_data", StepFailureData) elif event_type == DagsterEventType.STEP_SUCCESS: check.inst_param(event_specific_data, "event_specific_data", StepSuccessData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION: check.inst_param(event_specific_data, "event_specific_data", StepMaterializationData) elif event_type == DagsterEventType.STEP_EXPECTATION_RESULT: check.inst_param(event_specific_data, "event_specific_data", StepExpectationResultData) elif event_type == DagsterEventType.STEP_INPUT: check.inst_param(event_specific_data, "event_specific_data", StepInputData) elif event_type in ( DagsterEventType.ENGINE_EVENT, DagsterEventType.STEP_WORKER_STARTING, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, ): check.inst_param(event_specific_data, "event_specific_data", EngineEventData) elif event_type == DagsterEventType.HOOK_ERRORED: check.inst_param(event_specific_data, "event_specific_data", HookErroredData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: check.inst_param( event_specific_data, "event_specific_data", AssetMaterializationPlannedData ) return event_specific_data def log_step_event(step_context: IStepContext, event: "DagsterEvent") -> None: event_type = DagsterEventType(event.event_type_value) log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG step_context.log.log_dagster_event( level=log_level, msg=event.message or f"{event_type} for step {step_context.step.key}", dagster_event=event, ) def log_job_event(job_context: IPlanContext, event: "DagsterEvent") -> None: event_type = DagsterEventType(event.event_type_value) log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG job_context.log.log_dagster_event( level=log_level, msg=event.message or f"{event_type} for pipeline {job_context.job_name}", dagster_event=event, ) def log_resource_event(log_manager: DagsterLogManager, event: "DagsterEvent") -> None: event_specific_data = cast(EngineEventData, event.event_specific_data) log_level = logging.ERROR if event_specific_data.error else logging.DEBUG log_manager.log_dagster_event(level=log_level, msg=event.message or "", dagster_event=event) class DagsterEventSerializer(NamedTupleSerializer["DagsterEvent"]): def before_unpack(self, context, unpacked_dict: Any) -> Dict[str, Any]: event_type_value, event_specific_data = _handle_back_compat( unpacked_dict["event_type_value"], unpacked_dict.get("event_specific_data") ) unpacked_dict["event_type_value"] = event_type_value unpacked_dict["event_specific_data"] = event_specific_data return unpacked_dict def handle_unpack_error( self, exc: Exception, context: UnpackContext, storage_dict: Dict[str, Any], ) -> "DagsterEvent": event_type_value, _ = _handle_back_compat( storage_dict["event_type_value"], storage_dict.get("event_specific_data") ) step_key = storage_dict.get("step_key") orig_message = storage_dict.get("message") new_message = ( f"Could not deserialize event of type {event_type_value}. This event may have been" " written by a newer version of Dagster." + (f' Original message: "{orig_message}"' if orig_message else "") ) return DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, job_name=storage_dict["pipeline_name"], message=new_message, step_key=step_key, event_specific_data=EngineEventData( error=serializable_error_info_from_exc_info(sys.exc_info()) ), )
[docs]@whitelist_for_serdes( serializer=DagsterEventSerializer, storage_field_names={ "node_handle": "solid_handle", "job_name": "pipeline_name", }, ) class DagsterEvent( NamedTuple( "_DagsterEvent", [ ("event_type_value", str), ("job_name", str), ("step_handle", Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]]), ("node_handle", Optional[NodeHandle]), ("step_kind_value", Optional[str]), ("logging_tags", Optional[Mapping[str, str]]), ("event_specific_data", Optional["EventSpecificData"]), ("message", Optional[str]), ("pid", Optional[int]), ("step_key", Optional[str]), ], ) ): """Events yielded by op and job execution. Users should not instantiate this class. Attributes: event_type_value (str): Value for a DagsterEventType. job_name (str) node_handle (NodeHandle) step_kind_value (str): Value for a StepKind. logging_tags (Dict[str, str]) event_specific_data (Any): Type must correspond to event_type_value. message (str) pid (int) step_key (Optional[str]): DEPRECATED """ @staticmethod def from_step( event_type: "DagsterEventType", step_context: IStepContext, event_specific_data: Optional["EventSpecificData"] = None, message: Optional[str] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, event_specific_data=_validate_event_specific_data(event_type, event_specific_data), message=check.opt_str_param(message, "message"), pid=os.getpid(), ) log_step_event(step_context, event) return event @staticmethod def from_job( event_type: DagsterEventType, job_context: IPlanContext, message: Optional[str] = None, event_specific_data: Optional["EventSpecificData"] = None, step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, ) -> "DagsterEvent": check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ) event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=job_context.job_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data(event_type, event_specific_data), step_handle=step_handle, pid=os.getpid(), ) log_job_event(job_context, event) return event @staticmethod def from_resource( event_type: DagsterEventType, job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, message: Optional[str] = None, event_specific_data: Optional["EngineEventData"] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=job_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data( DagsterEventType.ENGINE_EVENT, event_specific_data ), step_handle=execution_plan.step_handle_for_single_step_plans(), pid=os.getpid(), ) log_resource_event(log_manager, event) return event def __new__( cls, event_type_value: str, job_name: str, step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, node_handle: Optional[NodeHandle] = None, step_kind_value: Optional[str] = None, logging_tags: Optional[Mapping[str, str]] = None, event_specific_data: Optional["EventSpecificData"] = None, message: Optional[str] = None, pid: Optional[int] = None, # legacy step_key: Optional[str] = None, ): # old events may contain node_handle but not step_handle if node_handle is not None and step_handle is None: step_handle = StepHandle(node_handle) # Legacy events may have step_key set directly, preserve those to stay in sync # with legacy execution plan snapshots. if step_handle is not None and step_key is None: step_key = step_handle.to_key() return super(DagsterEvent, cls).__new__( cls, check.str_param(event_type_value, "event_type_value"), check.str_param(job_name, "job_name"), check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ), check.opt_inst_param(node_handle, "node_handle", NodeHandle), check.opt_str_param(step_kind_value, "step_kind_value"), check.opt_mapping_param(logging_tags, "logging_tags"), _validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data), check.opt_str_param(message, "message"), check.opt_int_param(pid, "pid"), check.opt_str_param(step_key, "step_key"), ) @property def node_name(self) -> str: check.invariant(self.node_handle is not None) node_handle = cast(NodeHandle, self.node_handle) return node_handle.name @public @property def event_type(self) -> DagsterEventType: """DagsterEventType: The type of this event.""" return DagsterEventType(self.event_type_value) @public @property def is_step_event(self) -> bool: """bool: If this event relates to a specific step.""" return self.event_type in STEP_EVENTS @public @property def is_hook_event(self) -> bool: """bool: If this event relates to the execution of a hook.""" return self.event_type in HOOK_EVENTS @property def is_alert_event(self) -> bool: return self.event_type in ALERT_EVENTS @property def step_kind(self) -> "StepKind": from dagster._core.execution.plan.step import StepKind return StepKind(self.step_kind_value) @public @property def is_step_success(self) -> bool: """bool: If this event is of type STEP_SUCCESS.""" return self.event_type == DagsterEventType.STEP_SUCCESS @public @property def is_successful_output(self) -> bool: """bool: If this event is of type STEP_OUTPUT.""" return self.event_type == DagsterEventType.STEP_OUTPUT @public @property def is_step_start(self) -> bool: """bool: If this event is of type STEP_START.""" return self.event_type == DagsterEventType.STEP_START @public @property def is_step_failure(self) -> bool: """bool: If this event is of type STEP_FAILURE.""" return self.event_type == DagsterEventType.STEP_FAILURE @public @property def is_resource_init_failure(self) -> bool: """bool: If this event is of type RESOURCE_INIT_FAILURE.""" return self.event_type == DagsterEventType.RESOURCE_INIT_FAILURE @public @property def is_step_skipped(self) -> bool: """bool: If this event is of type STEP_SKIPPED.""" return self.event_type == DagsterEventType.STEP_SKIPPED @public @property def is_step_up_for_retry(self) -> bool: """bool: If this event is of type STEP_UP_FOR_RETRY.""" return self.event_type == DagsterEventType.STEP_UP_FOR_RETRY @public @property def is_step_restarted(self) -> bool: """bool: If this event is of type STEP_RESTARTED.""" return self.event_type == DagsterEventType.STEP_RESTARTED @property def is_job_success(self) -> bool: return self.event_type == DagsterEventType.RUN_SUCCESS @property def is_job_failure(self) -> bool: return self.event_type == DagsterEventType.RUN_FAILURE @property def is_run_failure(self) -> bool: return self.event_type == DagsterEventType.RUN_FAILURE @public @property def is_failure(self) -> bool: """bool: If this event represents the failure of a run or step.""" return self.event_type in FAILURE_EVENTS @property def is_job_event(self) -> bool: return self.event_type in PIPELINE_EVENTS @public @property def is_engine_event(self) -> bool: """bool: If this event is of type ENGINE_EVENT.""" return self.event_type == DagsterEventType.ENGINE_EVENT @public @property def is_handled_output(self) -> bool: """bool: If this event is of type HANDLED_OUTPUT.""" return self.event_type == DagsterEventType.HANDLED_OUTPUT @public @property def is_loaded_input(self) -> bool: """bool: If this event is of type LOADED_INPUT.""" return self.event_type == DagsterEventType.LOADED_INPUT @public @property def is_step_materialization(self) -> bool: """bool: If this event is of type ASSET_MATERIALIZATION.""" return self.event_type == DagsterEventType.ASSET_MATERIALIZATION @public @property def is_expectation_result(self) -> bool: """bool: If this event is of type STEP_EXPECTATION_RESULT.""" return self.event_type == DagsterEventType.STEP_EXPECTATION_RESULT @public @property def is_asset_observation(self) -> bool: """bool: If this event is of type ASSET_OBSERVATION.""" return self.event_type == DagsterEventType.ASSET_OBSERVATION @public @property def is_asset_materialization_planned(self) -> bool: """bool: If this event is of type ASSET_MATERIALIZATION_PLANNED.""" return self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED @public @property def asset_key(self) -> Optional[AssetKey]: """Optional[AssetKey]: For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that asset key. Otherwise, returns None. """ if self.event_type == DagsterEventType.ASSET_MATERIALIZATION: return self.step_materialization_data.materialization.asset_key elif self.event_type == DagsterEventType.ASSET_OBSERVATION: return self.asset_observation_data.asset_observation.asset_key elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.asset_key else: return None @public @property def partition(self) -> Optional[str]: """Optional[AssetKey]: For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that partition. Otherwise, returns None. """ if self.event_type == DagsterEventType.ASSET_MATERIALIZATION: return self.step_materialization_data.materialization.partition elif self.event_type == DagsterEventType.ASSET_OBSERVATION: return self.asset_observation_data.asset_observation.partition elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.partition else: return None @property def step_input_data(self) -> "StepInputData": _assert_type("step_input_data", DagsterEventType.STEP_INPUT, self.event_type) return cast(StepInputData, self.event_specific_data) @property def step_output_data(self) -> StepOutputData: _assert_type("step_output_data", DagsterEventType.STEP_OUTPUT, self.event_type) return cast(StepOutputData, self.event_specific_data) @property def step_success_data(self) -> "StepSuccessData": _assert_type("step_success_data", DagsterEventType.STEP_SUCCESS, self.event_type) return cast(StepSuccessData, self.event_specific_data) @property def step_failure_data(self) -> "StepFailureData": _assert_type("step_failure_data", DagsterEventType.STEP_FAILURE, self.event_type) return cast(StepFailureData, self.event_specific_data) @property def step_retry_data(self) -> "StepRetryData": _assert_type("step_retry_data", DagsterEventType.STEP_UP_FOR_RETRY, self.event_type) return cast(StepRetryData, self.event_specific_data) @property def step_materialization_data(self) -> "StepMaterializationData": _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return cast(StepMaterializationData, self.event_specific_data) @property def asset_observation_data(self) -> "AssetObservationData": _assert_type("asset_observation_data", DagsterEventType.ASSET_OBSERVATION, self.event_type) return cast(AssetObservationData, self.event_specific_data) @property def asset_materialization_planned_data(self) -> "AssetMaterializationPlannedData": _assert_type( "asset_materialization_planned", DagsterEventType.ASSET_MATERIALIZATION_PLANNED, self.event_type, ) return cast(AssetMaterializationPlannedData, self.event_specific_data) @property def step_expectation_result_data(self) -> "StepExpectationResultData": _assert_type( "step_expectation_result_data", DagsterEventType.STEP_EXPECTATION_RESULT, self.event_type, ) return cast(StepExpectationResultData, self.event_specific_data) @property def materialization(self) -> AssetMaterialization: _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return cast(StepMaterializationData, self.event_specific_data).materialization @property def job_failure_data(self) -> "JobFailureData": _assert_type("job_failure_data", DagsterEventType.RUN_FAILURE, self.event_type) return cast(JobFailureData, self.event_specific_data) @property def engine_event_data(self) -> "EngineEventData": _assert_type( "engine_event_data", [ DagsterEventType.ENGINE_EVENT, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.STEP_WORKER_STARTING, ], self.event_type, ) return cast(EngineEventData, self.event_specific_data) @property def hook_completed_data(self) -> Optional["EventSpecificData"]: _assert_type("hook_completed_data", DagsterEventType.HOOK_COMPLETED, self.event_type) return self.event_specific_data @property def hook_errored_data(self) -> "HookErroredData": _assert_type("hook_errored_data", DagsterEventType.HOOK_ERRORED, self.event_type) return cast(HookErroredData, self.event_specific_data) @property def hook_skipped_data(self) -> Optional["EventSpecificData"]: _assert_type("hook_skipped_data", DagsterEventType.HOOK_SKIPPED, self.event_type) return self.event_specific_data @property def logs_captured_data(self) -> "ComputeLogsCaptureData": _assert_type("logs_captured_data", DagsterEventType.LOGS_CAPTURED, self.event_type) return cast(ComputeLogsCaptureData, self.event_specific_data) @staticmethod def step_output_event( step_context: StepExecutionContext, step_output_data: StepOutputData ) -> "DagsterEvent": output_def = step_context.op.output_def_named( step_output_data.step_output_handle.output_name ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_OUTPUT, step_context=step_context, event_specific_data=step_output_data, message=( 'Yielded output "{output_name}"{mapping_clause} of type' ' "{output_type}".{type_check_clause}'.format( output_name=step_output_data.step_output_handle.output_name, output_type=output_def.dagster_type.display_name, type_check_clause=( ( " Warning! Type check failed." if not step_output_data.type_check_data.success else " (Type check passed)." ) if step_output_data.type_check_data else " (No type check)." ), mapping_clause=( f' mapping key "{step_output_data.step_output_handle.mapping_key}"' if step_output_data.step_output_handle.mapping_key else "" ), ) ), ) @staticmethod def step_failure_event( step_context: IStepContext, step_failure_data: "StepFailureData", message=None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_FAILURE, step_context=step_context, event_specific_data=step_failure_data, message=(message or f'Execution of step "{step_context.step.key}" failed.'), ) @staticmethod def step_retry_event( step_context: IStepContext, step_retry_data: "StepRetryData" ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_UP_FOR_RETRY, step_context=step_context, event_specific_data=step_retry_data, message=( 'Execution of step "{step_key}" failed and has requested a retry{wait_str}.'.format( step_key=step_context.step.key, wait_str=( f" in {step_retry_data.seconds_to_wait} seconds" if step_retry_data.seconds_to_wait else "" ), ) ), ) @staticmethod def step_input_event( step_context: StepExecutionContext, step_input_data: "StepInputData" ) -> "DagsterEvent": input_def = step_context.op_def.input_def_named(step_input_data.input_name) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_INPUT, step_context=step_context, event_specific_data=step_input_data, message='Got input "{input_name}" of type "{input_type}".{type_check_clause}'.format( input_name=step_input_data.input_name, input_type=input_def.dagster_type.display_name, type_check_clause=( ( " Warning! Type check failed." if not step_input_data.type_check_data.success else " (Type check passed)." ) if step_input_data.type_check_data else " (No type check)." ), ), ) @staticmethod def step_start_event(step_context: IStepContext) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_START, step_context=step_context, message='Started execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def step_restarted_event(step_context: IStepContext, previous_attempts: int) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_RESTARTED, step_context=step_context, message='Started re-execution (attempt # {n}) of step "{step_key}".'.format( step_key=step_context.step.key, n=previous_attempts + 1 ), ) @staticmethod def step_success_event( step_context: IStepContext, success: "StepSuccessData" ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SUCCESS, step_context=step_context, event_specific_data=success, message='Finished execution of step "{step_key}" in {duration}.'.format( step_key=step_context.step.key, duration=format_duration(success.duration_ms), ), ) @staticmethod def step_skipped_event(step_context: IStepContext) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SKIPPED, step_context=step_context, message='Skipped execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def asset_materialization( step_context: IStepContext, materialization: AssetMaterialization, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, step_context=step_context, event_specific_data=StepMaterializationData(materialization), message=( materialization.description if materialization.description else "Materialized value{label_clause}.".format( label_clause=f" {materialization.label}" if materialization.label else "" ) ), ) @staticmethod def asset_observation( step_context: IStepContext, observation: AssetObservation ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_OBSERVATION, step_context=step_context, event_specific_data=AssetObservationData(observation), ) @staticmethod def step_expectation_result( step_context: IStepContext, expectation_result: ExpectationResult ) -> "DagsterEvent": def _msg(): if expectation_result.description: return expectation_result.description return "Expectation{label_clause} {result_verb}".format( label_clause=" " + expectation_result.label if expectation_result.label else "", result_verb="passed" if expectation_result.success else "failed", ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_EXPECTATION_RESULT, step_context=step_context, event_specific_data=StepExpectationResultData(expectation_result), message=_msg(), ) @staticmethod def job_start(job_context: IPlanContext) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_START, job_context, message=f'Started execution of run for "{job_context.job_name}".', ) @staticmethod def job_success(job_context: IPlanContext) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_SUCCESS, job_context, message=f'Finished execution of run for "{job_context.job_name}".', ) @staticmethod def job_failure( job_context_or_name: Union[IPlanContext, str], context_msg: str, error_info: Optional[SerializableErrorInfo] = None, ) -> "DagsterEvent": check.str_param(context_msg, "context_msg") if isinstance(job_context_or_name, IPlanContext): return DagsterEvent.from_job( DagsterEventType.RUN_FAILURE, job_context_or_name, message=( f'Execution of run for "{job_context_or_name.job_name}" failed. {context_msg}' ), event_specific_data=JobFailureData(error_info), ) else: # when the failure happens trying to bring up context, the job_context hasn't been # built and so can't use from_pipeline check.str_param(job_context_or_name, "pipeline_name") event = DagsterEvent( event_type_value=DagsterEventType.RUN_FAILURE.value, job_name=job_context_or_name, event_specific_data=JobFailureData(error_info), message=f'Execution of run for "{job_context_or_name}" failed. {context_msg}', pid=os.getpid(), ) return event @staticmethod def job_canceled( job_context: IPlanContext, error_info: Optional[SerializableErrorInfo] = None ) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_CANCELED, job_context, message=f'Execution of run for "{job_context.job_name}" canceled.', event_specific_data=JobCanceledData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def step_worker_starting( step_context: IStepContext, message: str, metadata: Mapping[str, MetadataValue], ) -> "DagsterEvent": return DagsterEvent.from_step( DagsterEventType.STEP_WORKER_STARTING, step_context, message=message, event_specific_data=EngineEventData( metadata=metadata, marker_start="step_process_start" ), ) @staticmethod def step_worker_started( log_manager: DagsterLogManager, job_name: str, message: str, metadata: Mapping[str, MetadataValue], step_key: Optional[str], ) -> "DagsterEvent": event = DagsterEvent( DagsterEventType.STEP_WORKER_STARTED.value, job_name=job_name, message=message, event_specific_data=EngineEventData(metadata=metadata, marker_end="step_process_start"), pid=os.getpid(), step_key=step_key, ) log_manager.log_dagster_event( level=logging.DEBUG, msg=message, dagster_event=event, ) return event @staticmethod def resource_init_start( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_STARTED, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Starting initialization of resources [{}].".format( ", ".join(sorted(resource_keys)) ), event_specific_data=EngineEventData(metadata={}, marker_start="resources"), ) @staticmethod def resource_init_success( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_instances: Mapping[str, Any], resource_init_times: Mapping[str, str], ) -> "DagsterEvent": metadata = {} for key in resource_instances.keys(): metadata[key] = MetadataValue.python_artifact(resource_instances[key].__class__) metadata[f"{key}:init_time"] = resource_init_times[key] return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_SUCCESS, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Finished initialization of resources [{}].".format( ", ".join(sorted(resource_init_times.keys())) ), event_specific_data=EngineEventData( metadata=metadata, marker_end="resources", ), ) @staticmethod def resource_init_failure( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], error: SerializableErrorInfo, ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_FAILURE, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Initialization of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata={}, marker_end="resources", error=error, ), ) @staticmethod def resource_teardown_failure( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], error: SerializableErrorInfo, ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.ENGINE_EVENT, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Teardown of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata={}, marker_start=None, marker_end=None, error=error, ), ) @staticmethod def engine_event( plan_context: IPlanContext, message: str, event_specific_data: Optional["EngineEventData"] = None, ) -> "DagsterEvent": if isinstance(plan_context, IStepContext): return DagsterEvent.from_step( DagsterEventType.ENGINE_EVENT, step_context=plan_context, event_specific_data=event_specific_data, message=message, ) else: return DagsterEvent.from_job( DagsterEventType.ENGINE_EVENT, plan_context, message, event_specific_data=event_specific_data, ) @staticmethod def object_store_operation( step_context: IStepContext, object_store_operation_result: "ObjectStoreOperation" ) -> "DagsterEvent": object_store_name = ( "{object_store_name} ".format( object_store_name=object_store_operation_result.object_store_name ) if object_store_operation_result.object_store_name else "" ) serialization_strategy_modifier = ( " using {serialization_strategy_name}".format( serialization_strategy_name=object_store_operation_result.serialization_strategy_name ) if object_store_operation_result.serialization_strategy_name else "" ) value_name = object_store_operation_result.value_name if ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.SET_OBJECT ): message = ( "Stored intermediate object for output {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.GET_OBJECT ): message = ( "Retrieved intermediate object for input {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.CP_OBJECT ): message = ( "Copied intermediate object for input {value_name} from {key} to {dest_key}" ).format( value_name=value_name, key=object_store_operation_result.key, dest_key=object_store_operation_result.dest_key, ) else: message = "" return DagsterEvent.from_step( DagsterEventType.OBJECT_STORE_OPERATION, step_context, event_specific_data=ObjectStoreOperationResultData( op=object_store_operation_result.op, value_name=value_name, address=object_store_operation_result.key, metadata={"key": MetadataValue.path(object_store_operation_result.key)}, version=object_store_operation_result.version, mapping_key=object_store_operation_result.mapping_key, ), message=message, ) @staticmethod def handled_output( step_context: IStepContext, output_name: str, manager_key: str, message_override: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ) -> "DagsterEvent": message = f'Handled output "{output_name}" using IO manager "{manager_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.HANDLED_OUTPUT, step_context=step_context, event_specific_data=HandledOutputData( output_name=output_name, manager_key=manager_key, metadata=metadata if metadata else {}, ), message=message_override or message, ) @staticmethod def loaded_input( step_context: IStepContext, input_name: str, manager_key: str, upstream_output_name: Optional[str] = None, upstream_step_key: Optional[str] = None, message_override: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ) -> "DagsterEvent": message = f'Loaded input "{input_name}" using input manager "{manager_key}"' if upstream_output_name: message += f', from output "{upstream_output_name}" of step "{upstream_step_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.LOADED_INPUT, step_context=step_context, event_specific_data=LoadedInputData( input_name=input_name, manager_key=manager_key, upstream_output_name=upstream_output_name, upstream_step_key=upstream_step_key, metadata=metadata if metadata else {}, ), message=message_override or message, ) @staticmethod def hook_completed( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_COMPLETED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, message=( f'Finished the execution of hook "{hook_def.name}" triggered for' f' "{step_context.op.name}".' ), ) step_context.log.log_dagster_event( level=logging.DEBUG, msg=event.message or "", dagster_event=event ) return event @staticmethod def hook_errored( step_context: StepExecutionContext, error: HookExecutionError ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_ERRORED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, event_specific_data=_validate_event_specific_data( event_type, HookErroredData( error=serializable_error_info_from_exc_info(error.original_exc_info) ), ), ) step_context.log.log_dagster_event(level=logging.ERROR, msg=str(error), dagster_event=event) return event @staticmethod def hook_skipped( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_SKIPPED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, message=( 'Skipped the execution of hook "{hook_name}". It did not meet its triggering ' 'condition during the execution of "{solid_name}".' ).format(hook_name=hook_def.name, solid_name=step_context.op.name), ) step_context.log.log_dagster_event( level=logging.DEBUG, msg=event.message or "", dagster_event=event ) return event @staticmethod def legacy_compute_log_step_event(step_context: StepExecutionContext): step_key = step_context.step.key return DagsterEvent.from_step( DagsterEventType.LOGS_CAPTURED, step_context, message=f"Started capturing logs for step: {step_key}.", event_specific_data=ComputeLogsCaptureData( step_keys=[step_key], file_key=step_key, ), ) @staticmethod def capture_logs( job_context: IPlanContext, step_keys: Sequence[str], log_key: Sequence[str], log_context: CapturedLogContext, ): file_key = log_key[-1] return DagsterEvent.from_job( DagsterEventType.LOGS_CAPTURED, job_context, message=f"Started capturing logs in process (pid: {os.getpid()}).", event_specific_data=ComputeLogsCaptureData( step_keys=step_keys, file_key=file_key, external_stdout_url=log_context.external_stdout_url, external_stderr_url=log_context.external_stderr_url, external_url=log_context.external_url, ), )
def get_step_output_event( events: Sequence[DagsterEvent], step_key: str, output_name: Optional[str] = "result" ) -> Optional["DagsterEvent"]: check.sequence_param(events, "events", of_type=DagsterEvent) check.str_param(step_key, "step_key") check.str_param(output_name, "output_name") for event in events: if ( event.event_type == DagsterEventType.STEP_OUTPUT and event.step_key == step_key and event.step_output_data.output_name == output_name ): return event return None @whitelist_for_serdes class AssetObservationData( NamedTuple("_AssetObservation", [("asset_observation", AssetObservation)]) ): def __new__(cls, asset_observation: AssetObservation): return super(AssetObservationData, cls).__new__( cls, asset_observation=check.inst_param( asset_observation, "asset_observation", AssetObservation ), ) @whitelist_for_serdes class StepMaterializationData( NamedTuple( "_StepMaterializationData", [ ("materialization", AssetMaterialization), ("asset_lineage", Sequence[AssetLineageInfo]), ], ) ): def __new__( cls, materialization: AssetMaterialization, asset_lineage: Optional[Sequence[AssetLineageInfo]] = None, ): return super(StepMaterializationData, cls).__new__( cls, materialization=check.inst_param( materialization, "materialization", AssetMaterialization ), asset_lineage=check.opt_sequence_param( asset_lineage, "asset_lineage", of_type=AssetLineageInfo ), ) @whitelist_for_serdes class AssetMaterializationPlannedData( NamedTuple( "_AssetMaterializationPlannedData", [("asset_key", AssetKey), ("partition", Optional[str])], ) ): def __new__(cls, asset_key: AssetKey, partition: Optional[str] = None): return super(AssetMaterializationPlannedData, cls).__new__( cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey), partition=check.opt_str_param(partition, "partition"), ) @whitelist_for_serdes class StepExpectationResultData( NamedTuple( "_StepExpectationResultData", [ ("expectation_result", ExpectationResult), ], ) ): def __new__(cls, expectation_result: ExpectationResult): return super(StepExpectationResultData, cls).__new__( cls, expectation_result=check.inst_param( expectation_result, "expectation_result", ExpectationResult ), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class ObjectStoreOperationResultData( NamedTuple( "_ObjectStoreOperationResultData", [ ("op", ObjectStoreOperationType), ("value_name", Optional[str]), ("metadata", Mapping[str, MetadataValue]), ("address", Optional[str]), ("version", Optional[str]), ("mapping_key", Optional[str]), ], ) ): def __new__( cls, op: ObjectStoreOperationType, value_name: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, address: Optional[str] = None, version: Optional[str] = None, mapping_key: Optional[str] = None, ): return super(ObjectStoreOperationResultData, cls).__new__( cls, op=cast(ObjectStoreOperationType, check.str_param(op, "op")), value_name=check.opt_str_param(value_name, "value_name"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), address=check.opt_str_param(address, "address"), version=check.opt_str_param(version, "version"), mapping_key=check.opt_str_param(mapping_key, "mapping_key"), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class EngineEventData( NamedTuple( "_EngineEventData", [ ("metadata", Mapping[str, MetadataValue]), ("error", Optional[SerializableErrorInfo]), ("marker_start", Optional[str]), ("marker_end", Optional[str]), ], ) ): # serdes log # * added optional error # * added marker_start / marker_end # def __new__( cls, metadata: Optional[Mapping[str, RawMetadataValue]] = None, error: Optional[SerializableErrorInfo] = None, marker_start: Optional[str] = None, marker_end: Optional[str] = None, ): return super(EngineEventData, cls).__new__( cls, metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), error=check.opt_inst_param(error, "error", SerializableErrorInfo), marker_start=check.opt_str_param(marker_start, "marker_start"), marker_end=check.opt_str_param(marker_end, "marker_end"), ) @staticmethod def in_process( pid: int, step_keys_to_execute: Optional[Sequence[str]] = None ) -> "EngineEventData": return EngineEventData( metadata={ "pid": MetadataValue.text(str(pid)), **( {"step_keys": MetadataValue.text(str(step_keys_to_execute))} if step_keys_to_execute else {} ), } ) @staticmethod def multiprocess( pid: int, step_keys_to_execute: Optional[Sequence[str]] = None ) -> "EngineEventData": return EngineEventData( metadata={ "pid": MetadataValue.text(str(pid)), **( {"step_keys": MetadataValue.text(str(step_keys_to_execute))} if step_keys_to_execute else {} ), } ) @staticmethod def interrupted(steps_interrupted: Sequence[str]) -> "EngineEventData": return EngineEventData( metadata={"steps_interrupted": MetadataValue.text(str(steps_interrupted))} ) @staticmethod def engine_error(error: SerializableErrorInfo) -> "EngineEventData": return EngineEventData(metadata={}, error=error) @whitelist_for_serdes(storage_name="PipelineFailureData") class JobFailureData( NamedTuple( "_JobFailureData", [ ("error", Optional[SerializableErrorInfo]), ], ) ): def __new__(cls, error: Optional[SerializableErrorInfo]): return super(JobFailureData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes(storage_name="PipelineCanceledData") class JobCanceledData( NamedTuple( "_JobCanceledData", [ ("error", Optional[SerializableErrorInfo]), ], ) ): def __new__(cls, error: Optional[SerializableErrorInfo]): return super(JobCanceledData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class HookErroredData( NamedTuple( "_HookErroredData", [ ("error", SerializableErrorInfo), ], ) ): def __new__(cls, error: SerializableErrorInfo): return super(HookErroredData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class HandledOutputData( NamedTuple( "_HandledOutputData", [ ("output_name", str), ("manager_key", str), ("metadata", Mapping[str, MetadataValue]), ], ) ): def __new__( cls, output_name: str, manager_key: str, metadata: Optional[Mapping[str, MetadataValue]] = None, ): return super(HandledOutputData, cls).__new__( cls, output_name=check.str_param(output_name, "output_name"), manager_key=check.str_param(manager_key, "manager_key"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class LoadedInputData( NamedTuple( "_LoadedInputData", [ ("input_name", str), ("manager_key", str), ("upstream_output_name", Optional[str]), ("upstream_step_key", Optional[str]), ("metadata", Mapping[str, MetadataValue]), ], ) ): def __new__( cls, input_name: str, manager_key: str, upstream_output_name: Optional[str] = None, upstream_step_key: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ): return super(LoadedInputData, cls).__new__( cls, input_name=check.str_param(input_name, "input_name"), manager_key=check.str_param(manager_key, "manager_key"), upstream_output_name=check.opt_str_param(upstream_output_name, "upstream_output_name"), upstream_step_key=check.opt_str_param(upstream_step_key, "upstream_step_key"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), ) @whitelist_for_serdes(storage_field_names={"file_key": "log_key"}) class ComputeLogsCaptureData( NamedTuple( "_ComputeLogsCaptureData", [ ("file_key", str), # renamed log_key => file_key to avoid confusion ("step_keys", Sequence[str]), ("external_url", Optional[str]), ("external_stdout_url", Optional[str]), ("external_stderr_url", Optional[str]), ], ) ): def __new__( cls, file_key: str, step_keys: Sequence[str], external_url: Optional[str] = None, external_stdout_url: Optional[str] = None, external_stderr_url: Optional[str] = None, ): return super(ComputeLogsCaptureData, cls).__new__( cls, file_key=check.str_param(file_key, "file_key"), step_keys=check.opt_list_param(step_keys, "step_keys", of_type=str), external_url=check.opt_str_param(external_url, "external_url"), external_stdout_url=check.opt_str_param(external_stdout_url, "external_stdout_url"), external_stderr_url=check.opt_str_param(external_stderr_url, "external_stderr_url"), ) ################################################################################################### # THE GRAVEYARD # # -|- -|- -|- # | | | # _-'~~~~~`-_ . _-'~~~~~`-_ _-'~~~~~`-_ # .' '. .' '. .' '. # | R I P | | R I P | | R I P | # | | | | | | # | Synthetic | | Asset | | Pipeline | # | Process | | Store | | Init | # | Events | | Operations | | Failures | # | | | | | | ################################################################################################### # Old data structures referenced below # class AssetStoreOperationData(NamedTuple): # op: str # step_key: str # output_name: str # asset_store_key: str # # # class AssetStoreOperationType(Enum): # SET_ASSET = "SET_ASSET" # GET_ASSET = "GET_ASSET" # # # class PipelineInitFailureData(NamedTuple): # error: SerializableErrorInfo def _handle_back_compat( event_type_value: str, event_specific_data: Optional[Dict[str, Any]], ) -> Tuple[str, Optional[Dict[str, Any]]]: # transform old specific process events in to engine events if event_type_value in [ "PIPELINE_PROCESS_START", "PIPELINE_PROCESS_STARTED", "PIPELINE_PROCESS_EXITED", ]: return "ENGINE_EVENT", {"__class__": "EngineEventData"} # changes asset store ops in to get/set asset elif event_type_value == "ASSET_STORE_OPERATION": assert ( event_specific_data is not None ), "ASSET_STORE_OPERATION event must have specific data" if event_specific_data["op"] in ( "GET_ASSET", '{"__enum__": "AssetStoreOperationType.GET_ASSET"}', ): return ( "LOADED_INPUT", { "__class__": "LoadedInputData", "input_name": event_specific_data["output_name"], "manager_key": event_specific_data["asset_store_key"], }, ) if event_specific_data["op"] in ( "SET_ASSET", '{"__enum__": "AssetStoreOperationType.SET_ASSET"}', ): return ( "HANDLED_OUTPUT", { "__class__": "HandledOutputData", "output_name": event_specific_data["output_name"], "manager_key": event_specific_data["asset_store_key"], }, ) # previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION if event_type_value == "STEP_MATERIALIZATION": assert event_specific_data is not None, "STEP_MATERIALIZATION event must have specific data" return "ASSET_MATERIALIZATION", event_specific_data # transform PIPELINE_INIT_FAILURE to PIPELINE_FAILURE if event_type_value == "PIPELINE_INIT_FAILURE": assert ( event_specific_data is not None ), "PIPELINE_INIT_FAILURE event must have specific data" return "PIPELINE_FAILURE", { "__class__": "PipelineFailureData", "error": event_specific_data.get("error"), } return event_type_value, event_specific_data