You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.root

import os
from tempfile import TemporaryDirectory
from typing import Optional

from typing_extensions import TypedDict

from dagster import (
    StringSource,
    _check as check,
)
from dagster._config.config_schema import UserConfigSchema
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class LocalArtifactStorageConfig(TypedDict):
    base_dir: str


[docs]class LocalArtifactStorage(ConfigurableClass): def __init__(self, base_dir: str, inst_data: Optional[ConfigurableClassData] = None): self._base_dir = base_dir self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) @property def inst_data(self) -> Optional[ConfigurableClassData]: return self._inst_data @property def base_dir(self) -> str: return self._base_dir def file_manager_dir(self, run_id: str) -> str: check.str_param(run_id, "run_id") return os.path.join(self.base_dir, "storage", run_id, "files") @property def storage_dir(self) -> str: return os.path.join(self.base_dir, "storage") @property def schedules_dir(self) -> str: return os.path.join(self.base_dir, "schedules") @classmethod def from_config_value( cls, inst_data: Optional[ConfigurableClassData], config_value: LocalArtifactStorageConfig ) -> "LocalArtifactStorage": return LocalArtifactStorage(inst_data=inst_data, **config_value) @classmethod def config_type(cls) -> UserConfigSchema: return {"base_dir": StringSource} def dispose(self): pass
class TemporaryLocalArtifactStorage(LocalArtifactStorage): """Used by ephemeral DagsterInstances, defers directory creation til access since many uses of ephemeral instance do not require artifact directory. """ def __init__(self): self._temp_dir = None @property def base_dir(self): if self._temp_dir is None: self._temp_dir = TemporaryDirectory() return self._temp_dir.name def dispose(self): if self._temp_dir: self._temp_dir.cleanup()