You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.execution.execute_in_process_result

from typing import Any, Mapping, Optional, Sequence

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import JobDefinition, NodeHandle
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.storage.dagster_run import DagsterRun

from .execution_result import ExecutionResult


[docs]class ExecuteInProcessResult(ExecutionResult): """Result object returned by in-process testing APIs. Users should not instantiate this object directly. Used for retrieving run success, events, and outputs from execution methods that return this object. This object is returned by: - :py:meth:`dagster.GraphDefinition.execute_in_process` - :py:meth:`dagster.JobDefinition.execute_in_process` - :py:meth:`dagster.materialize_to_memory` - :py:meth:`dagster.materialize` """ _handle: NodeHandle _event_list: Sequence[DagsterEvent] _dagster_run: DagsterRun _output_capture: Mapping[StepOutputHandle, Any] _job_def: JobDefinition def __init__( self, event_list: Sequence[DagsterEvent], dagster_run: DagsterRun, output_capture: Optional[Mapping[StepOutputHandle, Any]], job_def: JobDefinition, ): self._job_def = job_def self._event_list = event_list self._dagster_run = dagster_run self._output_capture = check.opt_mapping_param( output_capture, "output_capture", key_type=StepOutputHandle ) @public @property def job_def(self) -> JobDefinition: """JobDefinition: The job definition that was executed.""" return self._job_def @public @property def dagster_run(self) -> DagsterRun: """DagsterRun: The Dagster run that was executed.""" return self._dagster_run @public @property def all_events(self) -> Sequence[DagsterEvent]: """List[DagsterEvent]: All dagster events emitted during execution.""" return self._event_list @public @property def run_id(self) -> str: """str: The run ID of the executed :py:class:`DagsterRun`.""" return self.dagster_run.run_id def _get_output_for_handle(self, handle: NodeHandle, output_name: str) -> Any: mapped_outputs = {} step_key = str(handle) output_found = False for step_output_handle, value in self._output_capture.items(): # For the mapped output case, where step keys are in the format # "step_key[upstream_mapped_output_name]" within the step output handle. if ( step_output_handle.step_key.startswith(f"{step_key}[") and step_output_handle.output_name == output_name ): output_found = True key_start = step_output_handle.step_key.find("[") key_end = step_output_handle.step_key.find("]") upstream_mapped_output_name = step_output_handle.step_key[key_start + 1 : key_end] mapped_outputs[upstream_mapped_output_name] = value # For all other cases, search for exact match. elif ( step_key == step_output_handle.step_key and step_output_handle.output_name == output_name ): output_found = True if not step_output_handle.mapping_key: return self._output_capture[step_output_handle] mapped_outputs[step_output_handle.mapping_key] = value if not output_found: raise DagsterInvariantViolationError( f"No outputs found for output '{output_name}' from node '{handle}'." ) return mapped_outputs
[docs] @public def output_for_node(self, node_str: str, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output value with a particular name from the in-process run of the job. Args: node_str (str): Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is `outer_graph.inner_node`. output_name (Optional[str]): Name of the output on the op/graph to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(ExecuteInProcessResult, self).output_for_node( node_str, output_name=output_name )
[docs] @public def asset_value(self, asset_key: CoercibleToAssetKey) -> Any: """Retrieves the value of an asset that was materialized during the execution of the job. Args: asset_key (CoercibleToAssetKey): The key of the asset to retrieve. Returns: Any: The value of the retrieved asset. """ node_output_handle = self._job_def.asset_layer.node_output_handle_for_asset( AssetKey.from_coercible(asset_key) ) return self.output_for_node( node_str=str(node_output_handle.node_handle), output_name=node_output_handle.output_name )
[docs] @public def output_value(self, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output of top-level job, if an output is returned. Args: output_name (Optional[str]): The name of the output to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(ExecuteInProcessResult, self).output_value(output_name=output_name)