This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import io
import random
from typing import Any, Dict, Optional
from unittest import mock
from dagster import resource
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method
from dagster_azure.blob import FakeBlobServiceClient
from .utils import ResourceNotFoundError
@resource({"account_name": str})
def fake_adls2_resource(context):
return FakeADLS2Resource(account_name=context.resource_config["account_name"])
[docs]class FakeADLS2Resource(ConfigurableResource):
"""Stateful mock of an ADLS2Resource for testing.
Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
account_name: str
storage_account: Optional[str] = None
def _is_dagster_maintained(cls) -> bool:
return True
def adls2_client(self) -> "FakeADLS2ServiceClient":
return FakeADLS2ServiceClient(self.account_name)
def blob_client(self) -> FakeBlobServiceClient:
return FakeBlobServiceClient(self.account_name)
def lease_client_constructor(self) -> Any:
return FakeLeaseClient
class FakeLeaseClient:
def __init__(self, client):
self.client = client = None
# client needs a ref to self to check if a given lease is valid
self.client._lease = self # noqa: SLF001
def acquire(self, lease_duration=-1):
if is None: = random.randint(0, 2**9)
raise Exception("Lease already held")
def release(self): = None
def is_valid(self, lease):
if is None:
# no lease is held so any operation is valid
return True
return lease ==
class FakeADLS2ServiceClient:
"""Stateful mock of an ADLS2 service client for testing.
Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
def __init__(self, account_name, credential="fake-creds"):
self._account_name = account_name
self._credential = mock.MagicMock()
self._credential.account_key = credential
self._file_systems = {}
def account_name(self):
return self._account_name
def credential(self):
return self._credential
def file_systems(self):
return self._file_systems
def get_file_system_client(self, file_system):
return self._file_systems.setdefault(
file_system, FakeADLS2FilesystemClient(self.account_name, file_system)
def get_file_client(self, file_system, file_path):
return self.get_file_system_client(file_system).get_file_client(file_path)
class FakeADLS2FilesystemClient:
"""Stateful mock of an ADLS2 filesystem client for testing."""
def __init__(self, account_name, file_system_name):
self._file_system: Dict[str, FakeADLS2FileClient] = {}
self._account_name = account_name
self._file_system_name = file_system_name
def account_name(self):
return self._account_name
def file_system_name(self):
return self._file_system_name
def keys(self):
return self._file_system.keys()
def get_file_system_properties(self):
return {"account_name": self.account_name, "file_system_name": self.file_system_name}
def has_file(self, path):
return bool(self._file_system.get(path))
def get_file_client(self, file_path):
# pass fileclient a ref to self and its name so the file can delete itself
self._file_system.setdefault(file_path, FakeADLS2FileClient(self, file_path))
return self._file_system[file_path]
def create_file(self, file):
# pass fileclient a ref to self and the file's name so the file can delete itself by
# accessing the self._file_system dict
self._file_system.setdefault(file, FakeADLS2FileClient(fs_client=self, name=file))
return self._file_system[file]
def delete_file(self, file):
for k in list(self._file_system.keys()):
if k.startswith(file):
del self._file_system[k]
class FakeADLS2FileClient:
"""Stateful mock of an ADLS2 file client for testing."""
def __init__(self, name, fs_client): = name
self.contents = None
self._lease = None
self.fs_client = fs_client
def lease(self):
return self._lease if self._lease is None else
def get_file_properties(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
lease_id = None if self._lease is None else
return {"lease": lease_id}
def upload_data(self, contents, overwrite=False, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
if self.contents is not None or overwrite is True:
if isinstance(contents, str):
self.contents = contents.encode("utf8")
elif isinstance(contents, io.BytesIO):
self.contents =
elif isinstance(contents, io.StringIO):
self.contents ="utf8")
elif isinstance(contents, bytes):
self.contents = contents
self.contents = contents
def download_file(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
return FakeADLS2FileDownloader(contents=self.contents)
def delete_file(self, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
class FakeADLS2FileDownloader:
"""Mock of an ADLS2 file downloader for testing."""
def __init__(self, contents):
self.contents = contents
def readall(self):
return self.contents
def readinto(self, fileobj):