This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import AbstractSet, Any, Mapping, Optional, cast
from dagster import (
DagsterRun,
JobDefinition,
OpDefinition,
_check as check,
)
from dagster._annotations import public
from dagster._core.definitions.dependency import Node, NodeHandle
from dagster._core.execution.context.compute import AbstractComputeExecutionContext
from dagster._core.execution.context.system import PlanExecutionContext, StepExecutionContext
from dagster._core.log_manager import DagsterLogManager
from dagster._core.system_config.objects import ResolvedRunConfig
[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext):
"""Dagstermill-specific execution context.
Do not initialize directly: use :func:`dagstermill.get_context`.
"""
def __init__(
self,
job_context: PlanExecutionContext,
job_def: JobDefinition,
resource_keys_to_init: AbstractSet[str],
op_name: str,
node_handle: NodeHandle,
op_config: Any = None,
):
self._job_context = check.inst_param(job_context, "job_context", PlanExecutionContext)
self._job_def = check.inst_param(job_def, "job_def", JobDefinition)
self._resource_keys_to_init = check.set_param(
resource_keys_to_init, "resource_keys_to_init", of_type=str
)
self.op_name = check.str_param(op_name, "op_name")
self.node_handle = check.inst_param(node_handle, "node_handle", NodeHandle)
self._op_config = op_config
def has_tag(self, key: str) -> bool:
"""Check if a logging tag is defined on the context.
Args:
key (str): The key to check.
Returns:
bool
"""
check.str_param(key, "key")
return self._job_context.has_tag(key)
def get_tag(self, key: str) -> Optional[str]:
"""Get a logging tag defined on the context.
Args:
key (str): The key to get.
Returns:
str
"""
check.str_param(key, "key")
return self._job_context.get_tag(key)
@public
@property
def run_id(self) -> str:
"""str: The run_id for the context."""
return self._job_context.run_id
@public
@property
def run_config(self) -> Mapping[str, Any]:
"""dict: The run_config for the context."""
return self._job_context.run_config
@property
def resolved_run_config(self) -> ResolvedRunConfig:
""":class:`dagster.ResolvedRunConfig`: The resolved_run_config for the context."""
return self._job_context.resolved_run_config
@public
@property
def logging_tags(self) -> Mapping[str, str]:
"""dict: The logging tags for the context."""
return self._job_context.logging_tags
@public
@property
def job_name(self) -> str:
"""str: The name of the executing job."""
return self._job_context.job_name
@public
@property
def job_def(self) -> JobDefinition:
""":class:`dagster.JobDefinition`: The job definition for the context.
This will be a dagstermill-specific shim.
"""
return self._job_def
@property
def resources(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
resources.
"""
return self._job_context.scoped_resources_builder.build(
required_resource_keys=self._resource_keys_to_init,
)
@public
@property
def run(self) -> DagsterRun:
""":class:`dagster.DagsterRun`: The job run for the context."""
return cast(DagsterRun, self._job_context.dagster_run)
@property
def log(self) -> DagsterLogManager:
""":class:`dagster.DagsterLogManager`: The log manager for the context.
Call, e.g., ``log.info()`` to log messages through the Dagster machinery.
"""
return self._job_context.log
@public
@property
def op_def(self) -> OpDefinition:
""":class:`dagster.OpDefinition`: The op definition for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether an
op definition was passed to ``dagstermill.get_context``.
"""
return cast(OpDefinition, self._job_def.node_def_named(self.op_name))
@property
def node(self) -> Node:
""":class:`dagster.Node`: The node for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether an
op definition was passed to ``dagstermill.get_context``.
"""
return self.job_def.get_node(self.node_handle)
@public
@property
def op_config(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
op-specific config.
"""
if self._op_config:
return self._op_config
op_config = self.resolved_run_config.ops.get(self.op_name)
return op_config.config if op_config else None
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext):
def __init__(
self,
job_context: PlanExecutionContext,
job_def: JobDefinition,
resource_keys_to_init: AbstractSet[str],
op_name: str,
step_context: StepExecutionContext,
node_handle: NodeHandle,
op_config: Any = None,
):
self._step_context = check.inst_param(step_context, "step_context", StepExecutionContext)
super().__init__(
job_context,
job_def,
resource_keys_to_init,
op_name,
node_handle,
op_config,
)
@property
def step_context(self) -> StepExecutionContext:
return self._step_context