You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.execution.job_execution_result

from typing import Any, Sequence

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import JobDefinition, NodeHandle
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.plan.utils import build_resources_for_manager
from dagster._core.storage.dagster_run import DagsterRun

from .execution_result import ExecutionResult


[docs]class JobExecutionResult(ExecutionResult): """Result object returned by :py:func:`dagster.execute_job`. Used for retrieving run success, events, and outputs from `execute_job`. Users should not directly instantiate this class. Events and run information can be retrieved off of the object directly. In order to access outputs, the `ExecuteJobResult` object needs to be opened as a context manager, which will re-initialize the resources from execution. """ def __init__(self, job_def, reconstruct_context, event_list, dagster_run): self._job_def = job_def self._reconstruct_context = reconstruct_context self._context = None self._event_list = event_list self._dagster_run = dagster_run def __enter__(self) -> "JobExecutionResult": context = self._reconstruct_context.__enter__() self._context = context return self def __exit__(self, *exc): exit_result = self._reconstruct_context.__exit__(*exc) self._context = None return exit_result @public @property def job_def(self) -> JobDefinition: """JobDefinition: The job definition that was executed.""" return self._job_def @public @property def dagster_run(self) -> DagsterRun: """DagsterRun: The Dagster run that was executed.""" return self._dagster_run @public @property def all_events(self) -> Sequence[DagsterEvent]: """Sequence[DagsterEvent]: List of all events yielded by the job execution.""" return self._event_list @public @property def run_id(self) -> str: """str: The id of the Dagster run that was executed.""" return self.dagster_run.run_id
[docs] @public def output_value(self, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output of top-level job, if an output is returned. In order to use this method, the `ExecuteJobResult` object must be opened as a context manager. If this method is used without opening the context manager, it will result in a :py:class:`DagsterInvariantViolationError`. If the top-level job has no output, calling this method will also result in a :py:class:`DagsterInvariantViolationError`. Args: output_name (Optional[str]): The name of the output to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(JobExecutionResult, self).output_value(output_name=output_name)
[docs] @public def output_for_node(self, node_str: str, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output value with a particular name from the run of the job. In order to use this method, the `ExecuteJobResult` object must be opened as a context manager. If this method is used without opening the context manager, it will result in a :py:class:`DagsterInvariantViolationError`. Args: node_str (str): Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is `outer_graph.inner_node`. output_name (Optional[str]): Name of the output on the op/graph to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(JobExecutionResult, self).output_for_node(node_str, output_name=output_name)
def _get_output_for_handle(self, handle: NodeHandle, output_name: str) -> Any: if not self._context: raise DagsterInvariantViolationError( "In order to access output objects, the result of `execute_job` must be opened as a" " context manager: 'with execute_job(...) as result:" ) found = False result = None for compute_step_event in self.compute_events_for_handle(handle): if ( compute_step_event.is_successful_output and compute_step_event.step_output_data.output_name == output_name ): found = True output = compute_step_event.step_output_data step = self._context.execution_plan.get_step_by_key(compute_step_event.step_key) dagster_type = ( self.job_def.get_node(handle).output_def_named(output_name).dagster_type ) value = self._get_value(self._context.for_step(step), output, dagster_type) check.invariant( not (output.mapping_key and step.get_mapping_key()), "Not set up to handle mapped outputs downstream of mapped steps", ) mapping_key = output.mapping_key or step.get_mapping_key() if mapping_key: if result is None: result = {mapping_key: value} else: result[mapping_key] = ( value # pylint:disable=unsupported-assignment-operation ) else: result = value if found: return result node = self.job_def.get_node(handle) raise DagsterInvariantViolationError( f"Did not find result {output_name} in {node.describe_node()}" ) def _get_value(self, context, step_output_data, dagster_type): step_output_handle = step_output_data.step_output_handle manager = context.get_io_manager(step_output_handle) manager_key = context.execution_plan.get_manager_key(step_output_handle, self.job_def) res = manager.load_input( context.for_input_manager( name=None, config=None, metadata=None, dagster_type=dagster_type, source_handle=step_output_handle, resource_config=context.resolved_run_config.resources[manager_key].config, resources=build_resources_for_manager(manager_key, context), ) ) return res