You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.executor_definition

from enum import Enum as PyEnum
from functools import update_wrapper
from typing import TYPE_CHECKING, Any, Callable, Dict, Mapping, Optional, Sequence, Union, overload

from typing_extensions import Self, TypeAlias

import dagster._check as check
from dagster._annotations import public
from dagster._builtins import Int
from dagster._config import Field, Noneable, Selector, UserConfigSchema
from dagster._core.definitions.configurable import (
    ConfiguredDefinitionConfigSchema,
    NamedConfigurableDefinition,
)
from dagster._core.definitions.job_base import IJob
from dagster._core.definitions.reconstruct import ReconstructableJob
from dagster._core.errors import DagsterUnmetExecutorRequirementsError
from dagster._core.execution.retries import RetryMode, get_retries_config
from dagster._core.execution.tags import get_tag_concurrency_limits_config

from .definition_config_schema import (
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)

if TYPE_CHECKING:
    from dagster._core.executor.base import Executor
    from dagster._core.executor.in_process import InProcessExecutor
    from dagster._core.executor.init import InitExecutorContext
    from dagster._core.executor.multiprocess import MultiprocessExecutor
    from dagster._core.instance import DagsterInstance


class ExecutorRequirement(PyEnum):
    """An ExecutorDefinition can include a list of requirements that the system uses to
    check whether the executor will be able to work for a particular job execution.
    """

    # The passed in IJob must be reconstructable across process boundaries
    RECONSTRUCTABLE_PIPELINE = (  # This needs to still exist for folks who may have written their own executor
        "RECONSTRUCTABLE_PIPELINE"
    )
    RECONSTRUCTABLE_JOB = "RECONSTRUCTABLE_PIPELINE"

    # The DagsterInstance must be loadable in a different process
    NON_EPHEMERAL_INSTANCE = "NON_EPHEMERAL_INSTANCE"

    # Any op outputs on the job must be persisted
    PERSISTENT_OUTPUTS = "PERSISTENT_OUTPUTS"


def multiple_process_executor_requirements() -> Sequence[ExecutorRequirement]:
    return [
        ExecutorRequirement.RECONSTRUCTABLE_JOB,
        ExecutorRequirement.NON_EPHEMERAL_INSTANCE,
        ExecutorRequirement.PERSISTENT_OUTPUTS,
    ]


ExecutorConfig = Mapping[str, object]
ExecutorCreationFunction: TypeAlias = Callable[["InitExecutorContext"], "Executor"]
ExecutorRequirementsFunction: TypeAlias = Callable[[ExecutorConfig], Sequence[ExecutorRequirement]]


[docs]class ExecutorDefinition(NamedConfigurableDefinition): """An executor is responsible for executing the steps of a job. Args: name (str): The name of the executor. config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.executor_config`. If not set, Dagster will accept any config provided. requirements (Optional[List[ExecutorRequirement]]): Any requirements that must be met in order for the executor to be usable for a particular job execution. executor_creation_fn(Optional[Callable]): Should accept an :py:class:`InitExecutorContext` and return an instance of :py:class:`Executor` required_resource_keys (Optional[Set[str]]): Keys for the resources required by the executor. description (Optional[str]): A description of the executor. """ def __init__( self, name: str, config_schema: Optional[UserConfigSchema] = None, requirements: Union[ ExecutorRequirementsFunction, Optional[Sequence[ExecutorRequirement]] ] = None, executor_creation_fn: Optional[ExecutorCreationFunction] = None, description: Optional[str] = None, ): self._name = check.str_param(name, "name") self._requirements_fn: ExecutorRequirementsFunction if callable(requirements): self._requirements_fn = requirements else: requirements_lst = check.opt_list_param( requirements, "requirements", of_type=ExecutorRequirement ) self._requirements_fn = lambda _: requirements_lst self._config_schema = convert_user_facing_definition_config_schema(config_schema) self._executor_creation_fn = check.opt_callable_param( executor_creation_fn, "executor_creation_fn" ) self._description = check.opt_str_param(description, "description") @public @property def name(self) -> str: """Name of the executor.""" return self._name @public @property def description(self) -> Optional[str]: """Description of executor, if provided.""" return self._description @property def config_schema(self) -> IDefinitionConfigSchema: return self._config_schema def get_requirements( self, executor_config: Mapping[str, object] ) -> Sequence[ExecutorRequirement]: return self._requirements_fn(executor_config) @public @property def executor_creation_fn(self) -> Optional[ExecutorCreationFunction]: """Callable that takes an :py:class:`InitExecutorContext` and returns an instance of :py:class:`Executor`. """ return self._executor_creation_fn def copy_for_configured(self, name, description, config_schema) -> "ExecutorDefinition": return ExecutorDefinition( name=name, config_schema=config_schema, # type: ignore executor_creation_fn=self.executor_creation_fn, description=description or self.description, requirements=self._requirements_fn, ) @staticmethod def hardcoded_executor(executor: "Executor"): return ExecutorDefinition( # Executor name was only relevant in the pipeline/solid/mode world, so we # can put a dummy value name="__executor__", executor_creation_fn=lambda _init_context: executor, ) # Backcompat: Overrides configured method to provide name as a keyword argument. # If no name is provided, the name is pulled off of this ExecutorDefinition.
[docs] @public def configured( self, config_or_config_fn: Any, name: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, description: Optional[str] = None, ) -> Self: """Wraps this object in an object of the same type that provides configuration to the inner object. Using ``configured`` may result in config values being displayed in the Dagster UI, so it is not recommended to use this API with sensitive values, such as secrets. Args: config_or_config_fn (Union[Any, Callable[[Any], Any]]): Either (1) Run configuration that fully satisfies this object's config schema or (2) A function that accepts run configuration and returns run configuration that fully satisfies this object's config schema. In the latter case, config_schema must be specified. When passing a function, it's easiest to use :py:func:`configured`. name (Optional[str]): Name of the new definition. If not provided, the emitted definition will inherit the name of the `ExecutorDefinition` upon which this function is called. config_schema (Optional[ConfigSchema]): If config_or_config_fn is a function, the config schema that its input must satisfy. If not set, Dagster will accept any config provided. description (Optional[str]): Description of the new definition. If not specified, inherits the description of the definition being configured. Returns (ConfigurableDefinition): A configured version of this object. """ name = check.opt_str_param(name, "name") new_config_schema = ConfiguredDefinitionConfigSchema( self, convert_user_facing_definition_config_schema(config_schema), config_or_config_fn ) return self.copy_for_configured(name or self.name, description, new_config_schema)
@overload def executor(name: ExecutorCreationFunction) -> ExecutorDefinition: ... @overload def executor( name: Optional[str] = ..., config_schema: Optional[UserConfigSchema] = ..., requirements: Optional[ Union[ExecutorRequirementsFunction, Sequence[ExecutorRequirement]] ] = ..., ) -> "_ExecutorDecoratorCallable": ...
[docs]def executor( name: Union[ExecutorCreationFunction, Optional[str]] = None, config_schema: Optional[UserConfigSchema] = None, requirements: Optional[ Union[ExecutorRequirementsFunction, Sequence[ExecutorRequirement]] ] = None, ) -> Union[ExecutorDefinition, "_ExecutorDecoratorCallable"]: """Define an executor. The decorated function should accept an :py:class:`InitExecutorContext` and return an instance of :py:class:`Executor`. Args: name (Optional[str]): The name of the executor. config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.executor_config`. If not set, Dagster will accept any config provided for. requirements (Optional[List[ExecutorRequirement]]): Any requirements that must be met in order for the executor to be usable for a particular job execution. """ if callable(name): check.invariant(config_schema is None) check.invariant(requirements is None) return _ExecutorDecoratorCallable()(name) return _ExecutorDecoratorCallable( name=name, config_schema=config_schema, requirements=requirements )
class _ExecutorDecoratorCallable: def __init__(self, name=None, config_schema=None, requirements=None): self.name = check.opt_str_param(name, "name") self.config_schema = config_schema # type check in definition self.requirements = requirements def __call__(self, fn: ExecutorCreationFunction) -> ExecutorDefinition: check.callable_param(fn, "fn") if not self.name: self.name = fn.__name__ executor_def = ExecutorDefinition( name=self.name, config_schema=self.config_schema, executor_creation_fn=fn, requirements=self.requirements, ) # `update_wrapper` typing cannot currently handle a Union of Callables correctly update_wrapper(executor_def, wrapped=fn) # type: ignore return executor_def def _core_in_process_executor_creation(config: ExecutorConfig) -> "InProcessExecutor": from dagster._core.executor.in_process import InProcessExecutor return InProcessExecutor( # shouldn't need to .get() here - issue with defaults in config setup retries=RetryMode.from_config(check.dict_elem(config, "retries")), # type: ignore # (possible none) marker_to_close=config.get("marker_to_close"), # type: ignore # (should be str) ) IN_PROC_CONFIG = Field( { "retries": get_retries_config(), "marker_to_close": Field( str, is_required=False, description="[DEPRECATED]", ), }, description="Execute all steps in a single process.", )
[docs]@executor( name="in_process", config_schema=IN_PROC_CONFIG, ) def in_process_executor(init_context): """The in-process executor executes all steps in a single process. To select it, include the following top-level fragment in config: .. code-block:: yaml execution: in_process: Execution priority can be configured using the ``dagster/priority`` tag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used. """ return _core_in_process_executor_creation(init_context.executor_config)
@executor(name="execute_in_process_executor") def execute_in_process_executor(_) -> "InProcessExecutor": """Executor used by execute_in_process. Use of this executor triggers special behavior in the config system that ignores all incoming executor config. This is because someone might set executor config on a job, and when we foist this executor onto the job for `execute_in_process`, that config becomes nonsensical. """ from dagster._core.executor.in_process import InProcessExecutor return InProcessExecutor( retries=RetryMode.ENABLED, marker_to_close=None, ) def _core_multiprocess_executor_creation(config: ExecutorConfig) -> "MultiprocessExecutor": from dagster._core.executor.multiprocess import MultiprocessExecutor # unpack optional selector start_method = None start_cfg: Dict[str, object] = {} start_selector = check.opt_dict_elem(config, "start_method") if start_selector: start_method, start_cfg = list(start_selector.items())[0] return MultiprocessExecutor( max_concurrent=check.opt_int_elem(config, "max_concurrent"), tag_concurrency_limits=check.opt_list_elem(config, "tag_concurrency_limits"), retries=RetryMode.from_config(check.dict_elem(config, "retries")), # type: ignore start_method=start_method, explicit_forkserver_preload=check.opt_list_elem(start_cfg, "preload_modules", of_type=str), ) MULTI_PROC_CONFIG = Field( { "max_concurrent": Field( Noneable(Int), default_value=None, description=( "The number of processes that may run concurrently. " "By default, this is set to be the return value of `multiprocessing.cpu_count()`." ), ), "tag_concurrency_limits": get_tag_concurrency_limits_config(), "start_method": Field( Selector( fields={ "spawn": Field( {}, description=( "Configure the multiprocess executor to start subprocesses " "using `spawn`." ), ), "forkserver": Field( { "preload_modules": Field( [str], is_required=False, description=( "Explicitly specify the modules to preload in the forkserver." " Otherwise, there are two cases for default values if modules" " are not specified. If the Dagster job was loaded from a" " module, the same module will be preloaded. If not, the" " `dagster` module is preloaded." ), ), }, description=( "Configure the multiprocess executor to start subprocesses " "using `forkserver`." ), ), # fork currently unsupported due to threads usage } ), is_required=False, description=( "Select how subprocesses are created. By default, `spawn` is selected. See " "https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods." ), ), "retries": get_retries_config(), }, description="Execute each step in an individual process.", )
[docs]@executor( name="multiprocess", config_schema=MULTI_PROC_CONFIG, requirements=multiple_process_executor_requirements(), ) def multiprocess_executor(init_context): """The multiprocess executor executes each step in an individual process. Any job that does not specify custom executors will use the multiprocess_executor by default. To configure the multiprocess executor, include a fragment such as the following in your run config: .. code-block:: yaml execution: config: multiprocess: max_concurrent: 4 The ``max_concurrent`` arg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set ``max_concurrent`` to be None or 0, this is the return value of :py:func:`python:multiprocessing.cpu_count`. Execution priority can be configured using the ``dagster/priority`` tag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used. """ return _core_multiprocess_executor_creation(init_context.executor_config)
def check_cross_process_constraints(init_context: "InitExecutorContext") -> None: from dagster._core.executor.init import InitExecutorContext check.inst_param(init_context, "init_context", InitExecutorContext) requirements_lst = init_context.executor_def.get_requirements(init_context.executor_config) if ExecutorRequirement.RECONSTRUCTABLE_JOB in requirements_lst: _check_intra_process_job(init_context.job) if ExecutorRequirement.NON_EPHEMERAL_INSTANCE in requirements_lst: _check_non_ephemeral_instance(init_context.instance) def _check_intra_process_job(job: IJob) -> None: if not isinstance(job, ReconstructableJob): raise DagsterUnmetExecutorRequirementsError( "You have attempted to use an executor that uses multiple processes with the job" f' "{job.get_definition().name}" that is not reconstructable. Job must be loaded in a' " way that allows dagster to reconstruct them in a new process. This means: \n *" " using the file, module, or workspace.yaml arguments of" " dagster-webserver/dagster-graphql/dagster\n * loading the job through the" " reconstructable() function\n" ) def _check_non_ephemeral_instance(instance: "DagsterInstance") -> None: if instance.is_ephemeral: raise DagsterUnmetExecutorRequirementsError( "You have attempted to use an executor that uses multiple processes with an ephemeral" " DagsterInstance. A non-ephemeral instance is needed to coordinate execution between" " multiple processes. You can configure your default instance via $DAGSTER_HOME or" " ensure a valid one is passed when invoking the python APIs. You can learn more about" " setting up a persistent DagsterInstance from the DagsterInstance docs here:" " https://docs.dagster.io/deployment/dagster-instance#default-local-behavior" ) def _get_default_executor_requirements( executor_config: ExecutorConfig, ) -> Sequence[ExecutorRequirement]: return multiple_process_executor_requirements() if "multiprocess" in executor_config else []
[docs]@executor( name="multi_or_in_process_executor", config_schema=Field( Selector( {"multiprocess": MULTI_PROC_CONFIG, "in_process": IN_PROC_CONFIG}, ), default_value={"multiprocess": {}}, ), requirements=_get_default_executor_requirements, ) def multi_or_in_process_executor(init_context: "InitExecutorContext") -> "Executor": """The default executor for a job. This is the executor available by default on a :py:class:`JobDefinition` that does not provide custom executors. This executor has a multiprocessing-enabled mode, and a single-process mode. By default, multiprocessing mode is enabled. Switching between multiprocess mode and in-process mode can be achieved via config. .. code-block:: yaml execution: config: multiprocess: execution: config: in_process: When using the multiprocess mode, ``max_concurrent`` and ``retries`` can also be configured. .. code-block:: yaml execution: config: multiprocess: max_concurrent: 4 retries: enabled: The ``max_concurrent`` arg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set ``max_concurrent`` to be 0, this is the return value of :py:func:`python:multiprocessing.cpu_count`. When using the in_process mode, then only retries can be configured. Execution priority can be configured using the ``dagster/priority`` tag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used. """ if "multiprocess" in init_context.executor_config: return _core_multiprocess_executor_creation( check.dict_elem(init_context.executor_config, "multiprocess") ) else: return _core_in_process_executor_creation( check.dict_elem(init_context.executor_config, "in_process") )