You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.execution.context.init

from typing import Any, Mapping, Optional, Union

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.resource_definition import (
    IContainsGenerator,
    ResourceDefinition,
    Resources,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.instance import DagsterInstance
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.dagster_run import DagsterRun


[docs]class InitResourceContext: """The context object available as the argument to the initialization function of a :py:class:`dagster.ResourceDefinition`. Users should not instantiate this object directly. To construct an `InitResourceContext` for testing purposes, use :py:func:`dagster.build_init_resource_context`. Example: .. code-block:: python from dagster import resource, InitResourceContext @resource def the_resource(init_context: InitResourceContext): init_context.log.info("Hello, world!") """ def __init__( self, resource_config: Any, resources: Resources, resource_def: Optional[ResourceDefinition] = None, instance: Optional[DagsterInstance] = None, dagster_run: Optional[DagsterRun] = None, log_manager: Optional[DagsterLogManager] = None, ): self._resource_config = resource_config self._resource_def = resource_def self._log_manager = log_manager self._instance = instance self._resources = resources self._dagster_run = dagster_run @public @property def resource_config(self) -> Any: """The configuration data provided by the run config. The schema for this data is defined by the ``config_field`` argument to :py:class:`ResourceDefinition`. """ return self._resource_config @public @property def resource_def(self) -> Optional[ResourceDefinition]: """The definition of the resource currently being constructed.""" return self._resource_def @public @property def resources(self) -> Resources: """The resources that are available to the resource that we are initalizing.""" return self._resources @public @property def instance(self) -> Optional[DagsterInstance]: """The Dagster instance configured for the current execution context.""" return self._instance @property def dagster_run(self) -> Optional[DagsterRun]: """The dagster run to use. When initializing resources outside of execution context, this will be None.""" return self._dagster_run @public @property def log(self) -> Optional[DagsterLogManager]: """The Dagster log manager configured for the current execution context.""" return self._log_manager # backcompat: keep around this property from when InitResourceContext used to be a NamedTuple @public @property def log_manager(self) -> Optional[DagsterLogManager]: """The log manager for this run of the job.""" return self._log_manager @public @property def run_id(self) -> Optional[str]: """The id for this run of the job or pipeline. When initializing resources outside of execution context, this will be None. """ return self.dagster_run.run_id if self.dagster_run else None def replace_config(self, config: Any) -> "InitResourceContext": return InitResourceContext( resource_config=config, resources=self.resources, instance=self.instance, resource_def=self.resource_def, dagster_run=self.dagster_run, log_manager=self.log, )
class UnboundInitResourceContext(InitResourceContext): """Resource initialization context outputted by ``build_init_resource_context``. Represents a context whose config has not yet been validated against a resource definition, hence the inability to access the `resource_def` attribute. When an instance of ``UnboundInitResourceContext`` is passed to a resource invocation, config is validated, and it is subsumed into an `InitResourceContext`, which contains the resource_def validated against. """ def __init__( self, resource_config: Any, resources: Optional[Union[Resources, Mapping[str, Any]]], instance: Optional[DagsterInstance], ): from dagster._core.execution.api import ephemeral_instance_if_missing from dagster._core.execution.build_resources import ( build_resources, wrap_resources_for_execution, ) from dagster._core.execution.context_creation_job import initialize_console_manager self._instance_provided = ( check.opt_inst_param(instance, "instance", DagsterInstance) is not None ) # Construct ephemeral instance if missing self._instance_cm = ephemeral_instance_if_missing(instance) # Pylint can't infer that the ephemeral_instance context manager has an __enter__ method, # so ignore lint error instance = self._instance_cm.__enter__() if isinstance(resources, Resources): check.failed("Should not have a Resources object directly from this initialization") self._resource_defs = wrap_resources_for_execution( check.opt_mapping_param(resources, "resources") ) self._resources_cm = build_resources(self._resource_defs, instance=instance) resources = self._resources_cm.__enter__() self._resources_contain_cm = isinstance(resources, IContainsGenerator) self._cm_scope_entered = False super(UnboundInitResourceContext, self).__init__( resource_config=resource_config, resources=resources, resource_def=None, instance=instance, dagster_run=None, log_manager=initialize_console_manager(None), ) def __enter__(self): self._cm_scope_entered = True return self def __exit__(self, *exc): self._resources_cm.__exit__(*exc) if self._instance_provided: self._instance_cm.__exit__(*exc) def __del__(self): if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered: self._resources_cm.__exit__(None, None, None) if self._instance_provided and not self._cm_scope_entered: self._instance_cm.__exit__(None, None, None) @property def resource_config(self) -> Any: return self._resource_config @property def resource_def(self) -> Optional[ResourceDefinition]: raise DagsterInvariantViolationError( "UnboundInitLoggerContext has not been validated against a logger definition." ) @property def resources(self) -> Resources: """The resources that are available to the resource that we are initalizing.""" if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access " "resources outside of context manager scope. You can use the following syntax to " "open a context manager: `with build_init_resource_context(...) as context:`" ) return self._resources @property def instance(self) -> Optional[DagsterInstance]: return self._instance @property def log(self) -> Optional[DagsterLogManager]: return self._log_manager # backcompat: keep around this property from when InitResourceContext used to be a NamedTuple @property def log_manager(self) -> Optional[DagsterLogManager]: return self._log_manager @property def run_id(self) -> Optional[str]: return None
[docs]def build_init_resource_context( config: Optional[Mapping[str, Any]] = None, resources: Optional[Mapping[str, Any]] = None, instance: Optional[DagsterInstance] = None, ) -> InitResourceContext: """Builds resource initialization context from provided parameters. ``build_init_resource_context`` can be used as either a function or context manager. If there is a provided resource to ``build_init_resource_context`` that is a context manager, then it must be used as a context manager. This function can be used to provide the context argument to the invocation of a resource. Args: resources (Optional[Dict[str, Any]]): The resources to provide to the context. These can be either values or resource definitions. config (Optional[Any]): The resource config to provide to the context. instance (Optional[DagsterInstance]): The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral(). Examples: .. code-block:: python context = build_init_resource_context() resource_to_init(context) with build_init_resource_context( resources={"foo": context_manager_resource} ) as context: resource_to_init(context) """ return UnboundInitResourceContext( resource_config=check.opt_mapping_param(config, "config", key_type=str), instance=check.opt_inst_param(instance, "instance", DagsterInstance), resources=check.opt_mapping_param(resources, "resources", key_type=str), )