This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import Any, Sequence
import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import JobDefinition, NodeHandle
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.plan.utils import build_resources_for_manager
from dagster._core.storage.dagster_run import DagsterRun
from .execution_result import ExecutionResult
[docs]class JobExecutionResult(ExecutionResult):
    """Result object returned by :py:func:`dagster.execute_job`.
    Used for retrieving run success, events, and outputs from `execute_job`.
    Users should not directly instantiate this class.
    Events and run information can be retrieved off of the object directly. In
    order to access outputs, the `ExecuteJobResult` object needs to be opened
    as a context manager, which will re-initialize the resources from
    execution.
    """
    def __init__(self, job_def, reconstruct_context, event_list, dagster_run):
        self._job_def = job_def
        self._reconstruct_context = reconstruct_context
        self._context = None
        self._event_list = event_list
        self._dagster_run = dagster_run
    def __enter__(self) -> "JobExecutionResult":
        context = self._reconstruct_context.__enter__()
        self._context = context
        return self
    def __exit__(self, *exc):
        exit_result = self._reconstruct_context.__exit__(*exc)
        self._context = None
        return exit_result
    @public
    @property
    def job_def(self) -> JobDefinition:
        """JobDefinition: The job definition that was executed."""
        return self._job_def
    @public
    @property
    def dagster_run(self) -> DagsterRun:
        """DagsterRun: The Dagster run that was executed."""
        return self._dagster_run
    @public
    @property
    def all_events(self) -> Sequence[DagsterEvent]:
        """Sequence[DagsterEvent]: List of all events yielded by the job execution."""
        return self._event_list
    @public
    @property
    def run_id(self) -> str:
        """str: The id of the Dagster run that was executed."""
        return self.dagster_run.run_id
[docs]    @public
    def output_value(self, output_name: str = DEFAULT_OUTPUT) -> Any:
        """Retrieves output of top-level job, if an output is returned.
        In order to use this method, the `ExecuteJobResult` object must be opened as a context manager. If this method is used without opening the context manager, it will result in a :py:class:`DagsterInvariantViolationError`. If the top-level job has no output, calling this method will also result in a :py:class:`DagsterInvariantViolationError`.
        Args:
            output_name (Optional[str]): The name of the output to retrieve. Defaults to `result`,
                the default output name in dagster.
        Returns:
            Any: The value of the retrieved output.
        """
        return super(JobExecutionResult, self).output_value(output_name=output_name)
[docs]    @public
    def output_for_node(self, node_str: str, output_name: str = DEFAULT_OUTPUT) -> Any:
        """Retrieves output value with a particular name from the run of the job.
        In order to use this method, the `ExecuteJobResult` object must be opened as a context manager. If this method is used without opening the context manager, it will result in a :py:class:`DagsterInvariantViolationError`.
        Args:
            node_str (str): Name of the op/graph whose output should be retrieved. If the intended
                graph/op is nested within another graph, the syntax is `outer_graph.inner_node`.
            output_name (Optional[str]): Name of the output on the op/graph to retrieve. Defaults to
                `result`, the default output name in dagster.
        Returns:
            Any: The value of the retrieved output.
        """
        return super(JobExecutionResult, self).output_for_node(node_str, output_name=output_name)
    def _get_output_for_handle(self, handle: NodeHandle, output_name: str) -> Any:
        if not self._context:
            raise DagsterInvariantViolationError(
                "In order to access output objects, the result of `execute_job` must be opened as a"
                " context manager: 'with execute_job(...) as result:"
            )
        found = False
        result = None
        for compute_step_event in self.compute_events_for_handle(handle):
            if (
                compute_step_event.is_successful_output
                and compute_step_event.step_output_data.output_name == output_name
            ):
                found = True
                output = compute_step_event.step_output_data
                step = self._context.execution_plan.get_step_by_key(compute_step_event.step_key)
                dagster_type = (
                    self.job_def.get_node(handle).output_def_named(output_name).dagster_type
                )
                value = self._get_value(self._context.for_step(step), output, dagster_type)
                check.invariant(
                    not (output.mapping_key and step.get_mapping_key()),
                    "Not set up to handle mapped outputs downstream of mapped steps",
                )
                mapping_key = output.mapping_key or step.get_mapping_key()
                if mapping_key:
                    if result is None:
                        result = {mapping_key: value}
                    else:
                        result[mapping_key] = (
                            value  # pylint:disable=unsupported-assignment-operation
                        )
                else:
                    result = value
        if found:
            return result
        node = self.job_def.get_node(handle)
        raise DagsterInvariantViolationError(
            f"Did not find result {output_name} in {node.describe_node()}"
        )
    def _get_value(self, context, step_output_data, dagster_type):
        step_output_handle = step_output_data.step_output_handle
        manager = context.get_io_manager(step_output_handle)
        manager_key = context.execution_plan.get_manager_key(step_output_handle, self.job_def)
        res = manager.load_input(
            context.for_input_manager(
                name=None,
                config=None,
                metadata=None,
                dagster_type=dagster_type,
                source_handle=step_output_handle,
                resource_config=context.resolved_run_config.resources[manager_key].config,
                resources=build_resources_for_manager(manager_key, context),
            )
        )
        return res