This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import os
import sys
import tempfile
from contextlib import ExitStack, contextmanager
from typing import Any, Iterator, Mapping, Optional
import yaml
from dagster._utils.error import serializable_error_info_from_exc_info
from .._utils.env import environ
from .._utils.merger import merge_dicts
from .instance import DagsterInstance
[docs]@contextmanager
def instance_for_test(
overrides: Optional[Mapping[str, Any]] = None,
set_dagster_home: bool = True,
temp_dir: Optional[str] = None,
) -> Iterator[DagsterInstance]:
"""Creates a persistent :py:class:`~dagster.DagsterInstance` available within a context manager.
When a context manager is opened, if no `temp_dir` parameter is set, a new
temporary directory will be created for the duration of the context
manager's opening. If the `set_dagster_home` parameter is set to True
(True by default), the `$DAGSTER_HOME` environment variable will be
overridden to be this directory (or the directory passed in by `temp_dir`)
for the duration of the context manager being open.
Args:
overrides (Optional[Mapping[str, Any]]):
Config to provide to instance (config format follows that typically found in an `instance.yaml` file).
set_dagster_home (Optional[bool]):
If set to True, the `$DAGSTER_HOME` environment variable will be
overridden to be the directory used by this instance for the
duration that the context manager is open. Upon the context
manager closing, the `$DAGSTER_HOME` variable will be re-set to the original value. (Defaults to True).
temp_dir (Optional[str]):
The directory to use for storing local artifacts produced by the
instance. If not set, a temporary directory will be created for
the duration of the context manager being open, and all artifacts
will be torn down afterward.
"""
with ExitStack() as stack:
if not temp_dir:
temp_dir = stack.enter_context(tempfile.TemporaryDirectory())
# wait for any grpc processes that created runs during test disposal to finish,
# since they might also be using this instance's tempdir (and to keep each test
# isolated / avoid race conditions in newer versions of grpcio when servers are
# shutting down and spinning up at the same time)
instance_overrides = merge_dicts(
{
"telemetry": {"enabled": False},
"code_servers": {"wait_for_local_processes_on_shutdown": True},
},
(overrides if overrides else {}),
)
if set_dagster_home:
stack.enter_context(
environ({"DAGSTER_HOME": temp_dir, "DAGSTER_DISABLE_TELEMETRY": "yes"})
)
with open(os.path.join(temp_dir, "dagster.yaml"), "w", encoding="utf8") as fd:
yaml.dump(instance_overrides, fd, default_flow_style=False)
with DagsterInstance.from_config(temp_dir) as instance:
try:
yield instance
except:
sys.stderr.write(
"Test raised an exception, attempting to clean up instance:"
+ serializable_error_info_from_exc_info(sys.exc_info()).to_string()
+ "\n"
)
raise
finally:
cleanup_test_instance(instance)
def cleanup_test_instance(instance: DagsterInstance) -> None:
# To avoid filesystem contention when we close the temporary directory, wait for
# all runs to reach a terminal state, and close any subprocesses or threads
# that might be accessing the run history DB.
# Since launcher is lazy loaded, we don't need to do anyting if it's None
if instance._run_launcher: # noqa: SLF001
instance._run_launcher.join() # noqa: SLF001