You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagstermill.context

from typing import AbstractSet, Any, Mapping, Optional, cast

from dagster import (
    DagsterRun,
    JobDefinition,
    OpDefinition,
    _check as check,
)
from dagster._annotations import public
from dagster._core.definitions.dependency import Node, NodeHandle
from dagster._core.execution.context.compute import AbstractComputeExecutionContext
from dagster._core.execution.context.system import PlanExecutionContext, StepExecutionContext
from dagster._core.log_manager import DagsterLogManager
from dagster._core.system_config.objects import ResolvedRunConfig


[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext): """Dagstermill-specific execution context. Do not initialize directly: use :func:`dagstermill.get_context`. """ def __init__( self, job_context: PlanExecutionContext, job_def: JobDefinition, resource_keys_to_init: AbstractSet[str], op_name: str, node_handle: NodeHandle, op_config: Any = None, ): self._job_context = check.inst_param(job_context, "job_context", PlanExecutionContext) self._job_def = check.inst_param(job_def, "job_def", JobDefinition) self._resource_keys_to_init = check.set_param( resource_keys_to_init, "resource_keys_to_init", of_type=str ) self.op_name = check.str_param(op_name, "op_name") self.node_handle = check.inst_param(node_handle, "node_handle", NodeHandle) self._op_config = op_config def has_tag(self, key: str) -> bool: """Check if a logging tag is defined on the context. Args: key (str): The key to check. Returns: bool """ check.str_param(key, "key") return self._job_context.has_tag(key) def get_tag(self, key: str) -> Optional[str]: """Get a logging tag defined on the context. Args: key (str): The key to get. Returns: str """ check.str_param(key, "key") return self._job_context.get_tag(key) @public @property def run_id(self) -> str: """str: The run_id for the context.""" return self._job_context.run_id @public @property def run_config(self) -> Mapping[str, Any]: """dict: The run_config for the context.""" return self._job_context.run_config @property def resolved_run_config(self) -> ResolvedRunConfig: """:class:`dagster.ResolvedRunConfig`: The resolved_run_config for the context.""" return self._job_context.resolved_run_config @public @property def logging_tags(self) -> Mapping[str, str]: """dict: The logging tags for the context.""" return self._job_context.logging_tags @public @property def job_name(self) -> str: """str: The name of the executing job.""" return self._job_context.job_name @public @property def job_def(self) -> JobDefinition: """:class:`dagster.JobDefinition`: The job definition for the context. This will be a dagstermill-specific shim. """ return self._job_def @property def resources(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to resources. """ return self._job_context.scoped_resources_builder.build( required_resource_keys=self._resource_keys_to_init, ) @public @property def run(self) -> DagsterRun: """:class:`dagster.DagsterRun`: The job run for the context.""" return cast(DagsterRun, self._job_context.dagster_run) @property def log(self) -> DagsterLogManager: """:class:`dagster.DagsterLogManager`: The log manager for the context. Call, e.g., ``log.info()`` to log messages through the Dagster machinery. """ return self._job_context.log @public @property def op_def(self) -> OpDefinition: """:class:`dagster.OpDefinition`: The op definition for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether an op definition was passed to ``dagstermill.get_context``. """ return cast(OpDefinition, self._job_def.node_def_named(self.op_name)) @property def node(self) -> Node: """:class:`dagster.Node`: The node for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether an op definition was passed to ``dagstermill.get_context``. """ return self.job_def.get_node(self.node_handle) @public @property def op_config(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to op-specific config. """ if self._op_config: return self._op_config op_config = self.resolved_run_config.ops.get(self.op_name) return op_config.config if op_config else None
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext): def __init__( self, job_context: PlanExecutionContext, job_def: JobDefinition, resource_keys_to_init: AbstractSet[str], op_name: str, step_context: StepExecutionContext, node_handle: NodeHandle, op_config: Any = None, ): self._step_context = check.inst_param(step_context, "step_context", StepExecutionContext) super().__init__( job_context, job_def, resource_keys_to_init, op_name, node_handle, op_config, ) @property def step_context(self) -> StepExecutionContext: return self._step_context