This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import io
import os
import shutil
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import BinaryIO, ContextManager, Iterator, Optional, TextIO, Union
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, StringSource
from dagster._core.definitions.resource_definition import dagster_maintained_resource, resource
from dagster._core.execution.context.init import InitResourceContext
from dagster._core.instance import DagsterInstance
from dagster._utils import mkdir_p
from .temp_file_manager import TempfileManager
IOStream: TypeAlias = Union[TextIO, BinaryIO]
[docs]class FileHandle(ABC):
"""A reference to a file as manipulated by a FileManager.
Subclasses may handle files that are resident on the local file system, in an object store, or
in any arbitrary place where a file can be stored.
This exists to handle the very common case where you wish to write a computation that reads,
transforms, and writes files, but where you also want the same code to work in local development
as well as on a cluster where the files will be stored in a globally available object store
such as S3.
"""
@public
@property
@abstractmethod
def path_desc(self) -> str:
"""A representation of the file path for display purposes only."""
raise NotImplementedError()
[docs]class LocalFileHandle(FileHandle):
"""A reference to a file on a local filesystem."""
def __init__(self, path: str):
self._path = check.str_param(path, "path")
@public
@property
def path(self) -> str:
"""The file's path."""
return self._path
@public
@property
def path_desc(self) -> str:
"""A representation of the file path for display purposes only."""
return self._path
[docs]class FileManager(ABC):
"""Base class for all file managers in dagster.
The file manager is an interface that can be implemented by resources to provide abstract
access to a file system such as local disk, S3, or other cloud storage.
For examples of usage, see the documentation of the concrete file manager implementations.
"""
[docs] @public
@abstractmethod
def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str:
"""Copy a file represented by a file handle to a temp file.
In an implementation built around an object store such as S3, this method would be expected
to download the file from S3 to local filesystem in a location assigned by the standard
library's :py:mod:`python:tempfile` module.
Temp files returned by this method are *not* guaranteed to be reusable across solid
boundaries. For files that must be available across solid boundaries, use the
:py:meth:`~dagster._core.storage.file_manager.FileManager.read`,
:py:meth:`~dagster._core.storage.file_manager.FileManager.read_data`,
:py:meth:`~dagster._core.storage.file_manager.FileManager.write`, and
:py:meth:`~dagster._core.storage.file_manager.FileManager.write_data` methods.
Args:
file_handle (FileHandle): The handle to the file to make available as a local temp file.
Returns:
str: Path to the local temp file.
"""
raise NotImplementedError()
[docs] @public
@abstractmethod
def delete_local_temp(self) -> None:
"""Delete all local temporary files created by previous calls to
:py:meth:`~dagster._core.storage.file_manager.FileManager.copy_handle_to_local_temp`.
Should typically only be called by framework implementors.
"""
raise NotImplementedError()
[docs] @public
@abstractmethod
def read(self, file_handle: FileHandle, mode: str = "rb") -> ContextManager[IOStream]:
"""Return a file-like stream for the file handle.
This may incur an expensive network call for file managers backed by object stores
such as S3.
Args:
file_handle (FileHandle): The file handle to make available as a stream.
mode (str): The mode in which to open the file. Default: ``"rb"``.
Returns:
Union[TextIO, BinaryIO]: A file-like stream.
"""
raise NotImplementedError()
[docs] @public
@abstractmethod
def read_data(self, file_handle: FileHandle) -> bytes:
"""Return the bytes for a given file handle. This may incur an expensive network
call for file managers backed by object stores such as s3.
Args:
file_handle (FileHandle): The file handle for which to return bytes.
Returns:
bytes: Bytes for a given file handle.
"""
raise NotImplementedError()
[docs] @public
@abstractmethod
def write(self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None) -> FileHandle:
"""Write the bytes contained within the given file object into the file manager.
Args:
file_obj (Union[TextIO, StringIO]): A file-like object.
mode (Optional[str]): The mode in which to write the file into the file manager.
Default: ``"wb"``.
ext (Optional[str]): For file managers that support file extensions, the extension with
which to write the file. Default: ``None``.
Returns:
FileHandle: A handle to the newly created file.
"""
raise NotImplementedError()
[docs] @public
@abstractmethod
def write_data(self, data: bytes, ext: Optional[str] = None) -> FileHandle:
"""Write raw bytes into the file manager.
Args:
data (bytes): The bytes to write into the file manager.
ext (Optional[str]): For file managers that support file extensions, the extension with
which to write the file. Default: ``None``.
Returns:
FileHandle: A handle to the newly created file.
"""
raise NotImplementedError()
[docs]@dagster_maintained_resource
@resource(config_schema={"base_dir": Field(StringSource, is_required=False)})
def local_file_manager(init_context: InitResourceContext) -> "LocalFileManager":
"""FileManager that provides abstract access to a local filesystem.
By default, files will be stored in `<local_artifact_storage>/storage/file_manager` where
`<local_artifact_storage>` can be configured the ``dagster.yaml`` file in ``$DAGSTER_HOME``.
Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API.
Examples:
.. code-block:: python
import tempfile
from dagster import job, local_file_manager, op
@op(required_resource_keys={"file_manager"})
def write_files(context):
fh_1 = context.resources.file_manager.write_data(b"foo")
with tempfile.NamedTemporaryFile("w+") as fd:
fd.write("bar")
fd.seek(0)
fh_2 = context.resources.file_manager.write(fd, mode="w", ext=".txt")
return (fh_1, fh_2)
@op(required_resource_keys={"file_manager"})
def read_files(context, file_handles):
fh_1, fh_2 = file_handles
assert context.resources.file_manager.read_data(fh_2) == b"bar"
fd = context.resources.file_manager.read(fh_2, mode="r")
assert fd.read() == "foo"
fd.close()
@job(resource_defs={"file_manager": local_file_manager})
def files_pipeline():
read_files(write_files())
Or to specify the file directory:
.. code-block:: python
@job(
resource_defs={
"file_manager": local_file_manager.configured({"base_dir": "/my/base/dir"})
}
)
def files_pipeline():
read_files(write_files())
"""
return LocalFileManager(
base_dir=init_context.resource_config.get(
"base_dir", os.path.join(init_context.instance.storage_directory(), "file_manager") # type: ignore # (possible none)
)
)
def check_file_like_obj(obj: object) -> None:
check.invariant(obj and hasattr(obj, "read") and hasattr(obj, "write"))
class LocalFileManager(FileManager):
def __init__(self, base_dir: str):
self.base_dir = base_dir
self._base_dir_ensured = False
self._temp_file_manager = TempfileManager()
@staticmethod
def for_instance(instance: DagsterInstance, run_id: str) -> "LocalFileManager":
check.inst_param(instance, "instance", DagsterInstance)
return LocalFileManager(instance.file_manager_directory(run_id))
def ensure_base_dir_exists(self) -> None:
if self._base_dir_ensured:
return
mkdir_p(self.base_dir)
self._base_dir_ensured = True
def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str:
check.inst_param(file_handle, "file_handle", FileHandle)
with self.read(file_handle, "rb") as handle_obj: # type: ignore # (??)
temp_file_obj = self._temp_file_manager.tempfile()
temp_file_obj.write(handle_obj.read())
temp_name = temp_file_obj.name
temp_file_obj.close()
return temp_name
@contextmanager
def read(self, file_handle: LocalFileHandle, mode: str = "rb") -> Iterator[IOStream]:
check.inst_param(file_handle, "file_handle", LocalFileHandle)
check.str_param(mode, "mode")
check.param_invariant(mode in {"r", "rb"}, "mode")
encoding = None if mode == "rb" else "utf8"
with open(file_handle.path, mode, encoding=encoding) as file_obj:
yield file_obj # type: ignore # (??)
def read_data(self, file_handle: LocalFileHandle) -> bytes:
with self.read(file_handle, mode="rb") as file_obj:
return file_obj.read() # type: ignore # (??)
def write_data(self, data: bytes, ext: Optional[str] = None):
check.inst_param(data, "data", bytes)
return self.write(io.BytesIO(data), mode="wb", ext=ext)
def write(
self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None
) -> LocalFileHandle:
check_file_like_obj(file_obj)
check.opt_str_param(ext, "ext")
self.ensure_base_dir_exists()
dest_file_path = os.path.join(
self.base_dir, str(uuid.uuid4()) + (("." + ext) if ext is not None else "")
)
encoding = None if "b" in mode else "utf8"
with open(dest_file_path, mode, encoding=encoding) as dest_file_obj:
shutil.copyfileobj(file_obj, dest_file_obj) # type: ignore # (??)
return LocalFileHandle(dest_file_path)
def delete_local_temp(self) -> None:
self._temp_file_manager.close()