You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_azure.adls2.resources

from typing import Any, Dict, Union

from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeLeaseClient
from dagster import (
    Config,
    ConfigurableResource,
    Field as DagsterField,
    Permissive,
    Selector,
    StringSource,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import merge_dicts
from pydantic import Field
from typing_extensions import Literal

from dagster_azure.blob.utils import BlobServiceClient, create_blob_client

from .file_manager import ADLS2FileManager
from .utils import DataLakeServiceClient, create_adls2_client


class ADLS2SASToken(Config):
    credential_type: Literal["sas"] = "sas"
    token: str


class ADLS2Key(Config):
    credential_type: Literal["key"] = "key"
    key: str


class ADLS2DefaultAzureCredential(Config):
    credential_type: Literal["default_azure_credential"] = "default_azure_credential"
    kwargs: Dict[str, Any]


class ADLS2BaseResource(ConfigurableResource):
    storage_account: str = Field(description="The storage account name.")
    credential: Union[ADLS2SASToken, ADLS2Key, ADLS2DefaultAzureCredential] = Field(
        discriminator="credential_type", description="The credentials with which to authenticate."
    )


DEFAULT_AZURE_CREDENTIAL_CONFIG = DagsterField(
    Permissive(
        description="Uses DefaultAzureCredential to authenticate and passed as keyword arguments",
    )
)

ADLS2_CLIENT_CONFIG = {
    "storage_account": DagsterField(StringSource, description="The storage account name."),
    "credential": DagsterField(
        Selector(
            {
                "sas": DagsterField(StringSource, description="SAS token for the account."),
                "key": DagsterField(StringSource, description="Shared Access Key for the account."),
                "DefaultAzureCredential": DEFAULT_AZURE_CREDENTIAL_CONFIG,
            }
        ),
        description="The credentials with which to authenticate.",
    ),
}


class ADLS2Resource(ADLS2BaseResource):
    """Resource containing clients to access Azure Data Lake Storage Gen2.

    Contains a client for both the Data Lake and Blob APIs, to work around the limitations
    of each.
    """

    @classmethod
    def _is_dagster_maintained(cls) -> bool:
        return True

    @property
    @cached_method
    def _raw_credential(self) -> Any:
        if isinstance(self.credential, ADLS2Key):
            return self.credential.key
        elif isinstance(self.credential, ADLS2SASToken):
            return self.credential.token
        else:
            return DefaultAzureCredential(**self.credential.kwargs)

    @property
    @cached_method
    def adls2_client(self) -> DataLakeServiceClient:
        return create_adls2_client(self.storage_account, self._raw_credential)

    @property
    @cached_method
    def blob_client(self) -> BlobServiceClient:
        return create_blob_client(self.storage_account, self._raw_credential)

    @property
    def lease_client_constructor(self) -> Any:
        return DataLakeLeaseClient


# Due to a limitation of the discriminated union type, we can't directly mirror these old
# config fields in the new resource config. Instead, we'll just use the old config fields
# to construct the new config and then use that to construct the resource.
[docs]@dagster_maintained_resource @resource(ADLS2_CLIENT_CONFIG) def adls2_resource(context): """Resource that gives ops access to Azure Data Lake Storage Gen2. The underlying client is a :py:class:`~azure.storage.filedatalake.DataLakeServiceClient`. Attach this resource definition to a :py:class:`~dagster.JobDefinition` in order to make it available to your ops. Example: .. code-block:: python from dagster import job, op from dagster_azure.adls2 import adls2_resource @op(required_resource_keys={'adls2'}) def example_adls2_op(context): return list(context.resources.adls2.adls2_client.list_file_systems()) @job(resource_defs={"adls2": adls2_resource}) def my_job(): example_adls2_op() Note that your ops must also declare that they require this resource with `required_resource_keys`, or it will not be initialized for the execution of their compute functions. You may pass credentials to this resource using either a SAS token, a key or by passing the `DefaultAzureCredential` object. .. code-block:: YAML resources: adls2: config: storage_account: my_storage_account # str: The storage account name. credential: sas: my_sas_token # str: the SAS token for the account. key: env: AZURE_DATA_LAKE_STORAGE_KEY # str: The shared access key for the account. DefaultAzureCredential: {} # dict: The keyword arguments used for DefaultAzureCredential # or leave the object empty for no arguments DefaultAzureCredential: exclude_environment_credential: true """ return _adls2_resource_from_config(context.resource_config)
[docs]@dagster_maintained_resource @resource( merge_dicts( ADLS2_CLIENT_CONFIG, { "adls2_file_system": DagsterField( StringSource, description="ADLS Gen2 file system name" ), "adls2_prefix": DagsterField(StringSource, is_required=False, default_value="dagster"), }, ) ) def adls2_file_manager(context): """FileManager that provides abstract access to ADLS2. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ adls2_client = _adls2_resource_from_config(context.resource_config).adls2_client return ADLS2FileManager( adls2_client=adls2_client, file_system=context.resource_config["adls2_file_system"], prefix=context.resource_config["adls2_prefix"], )
def _adls2_resource_from_config(config) -> ADLS2Resource: """Args: config: A configuration containing the fields in ADLS2_CLIENT_CONFIG. Returns: An adls2 client. """ storage_account = config["storage_account"] if "DefaultAzureCredential" in config["credential"]: credential = ADLS2DefaultAzureCredential( kwargs=config["credential"]["DefaultAzureCredential"] ) elif "sas" in config["credential"]: credential = ADLS2SASToken(token=config["credential"]["sas"]) else: credential = ADLS2Key(key=config["credential"]["key"]) return ADLS2Resource(storage_account=storage_account, credential=credential)