from typing import Any, Dict, Union

from azure.identity import DefaultAzureCredential
from import DataLakeLeaseClient
from dagster import (
    Field as DagsterField,
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import merge_dicts
from pydantic import Field
from typing_extensions import Literal

from dagster_azure.blob.utils import BlobServiceClient, create_blob_client

from .file_manager import ADLS2FileManager
from .utils import DataLakeServiceClient, create_adls2_client

class ADLS2SASToken(Config):
    credential_type: Literal["sas"] = "sas"
    token: str

class ADLS2Key(Config):
    credential_type: Literal["key"] = "key"
    key: str

class ADLS2DefaultAzureCredential(Config):
    credential_type: Literal["default_azure_credential"] = "default_azure_credential"
    kwargs: Dict[str, Any]

class ADLS2BaseResource(ConfigurableResource):
    storage_account: str = Field(description="The storage account name.")
    credential: Union[ADLS2SASToken, ADLS2Key, ADLS2DefaultAzureCredential] = Field(
        discriminator="credential_type", description="The credentials with which to authenticate."

        description="Uses DefaultAzureCredential to authenticate and passed as keyword arguments",

    "storage_account": DagsterField(StringSource, description="The storage account name."),
    "credential": DagsterField(
                "sas": DagsterField(StringSource, description="SAS token for the account."),
                "key": DagsterField(StringSource, description="Shared Access Key for the account."),
                "DefaultAzureCredential": DEFAULT_AZURE_CREDENTIAL_CONFIG,
        description="The credentials with which to authenticate.",

class ADLS2Resource(ADLS2BaseResource):
    """Resource containing clients to access Azure Data Lake Storage Gen2.

    Contains a client for both the Data Lake and Blob APIs, to work around the limitations
    of each.

    def _is_dagster_maintained(cls) -> bool:
        return True

    def _raw_credential(self) -> Any:
        if isinstance(self.credential, ADLS2Key):
            return self.credential.key
        elif isinstance(self.credential, ADLS2SASToken):
            return self.credential.token
            return DefaultAzureCredential(**self.credential.kwargs)

    def adls2_client(self) -> DataLakeServiceClient:
        return create_adls2_client(self.storage_account, self._raw_credential)

    def blob_client(self) -> BlobServiceClient:
        return create_blob_client(self.storage_account, self._raw_credential)

    def lease_client_constructor(self) -> Any:
        return DataLakeLeaseClient

# Due to a limitation of the discriminated union type, we can't directly mirror these old
# config fields in the new resource config. Instead, we'll just use the old config fields
# to construct the new config and then use that to construct the resource.
[docs]@dagster_maintained_resource @resource(ADLS2_CLIENT_CONFIG) def adls2_resource(context): """Resource that gives ops access to Azure Data Lake Storage Gen2. The underlying client is a :py:class:``. Attach this resource definition to a :py:class:`~dagster.JobDefinition` in order to make it available to your ops. Example: .. code-block:: python from dagster import job, op from dagster_azure.adls2 import adls2_resource @op(required_resource_keys={'adls2'}) def example_adls2_op(context): return list(context.resources.adls2.adls2_client.list_file_systems()) @job(resource_defs={"adls2": adls2_resource}) def my_job(): example_adls2_op() Note that your ops must also declare that they require this resource with `required_resource_keys`, or it will not be initialized for the execution of their compute functions. You may pass credentials to this resource using either a SAS token, a key or by passing the `DefaultAzureCredential` object. .. code-block:: YAML resources: adls2: config: storage_account: my_storage_account # str: The storage account name. credential: sas: my_sas_token # str: the SAS token for the account. key: env: AZURE_DATA_LAKE_STORAGE_KEY # str: The shared access key for the account. DefaultAzureCredential: {} # dict: The keyword arguments used for DefaultAzureCredential # or leave the object empty for no arguments DefaultAzureCredential: exclude_environment_credential: true """ return _adls2_resource_from_config(context.resource_config)
[docs]@dagster_maintained_resource @resource( merge_dicts( ADLS2_CLIENT_CONFIG, { "adls2_file_system": DagsterField( StringSource, description="ADLS Gen2 file system name" ), "adls2_prefix": DagsterField(StringSource, is_required=False, default_value="dagster"), }, ) ) def adls2_file_manager(context): """FileManager that provides abstract access to ADLS2. Implements the :py:class:`` API. """ adls2_client = _adls2_resource_from_config(context.resource_config).adls2_client return ADLS2FileManager( adls2_client=adls2_client, file_system=context.resource_config["adls2_file_system"], prefix=context.resource_config["adls2_prefix"], )
def _adls2_resource_from_config(config) -> ADLS2Resource: """Args: config: A configuration containing the fields in ADLS2_CLIENT_CONFIG. Returns: An adls2 client. """ storage_account = config["storage_account"] if "DefaultAzureCredential" in config["credential"]: credential = ADLS2DefaultAzureCredential( kwargs=config["credential"]["DefaultAzureCredential"] ) elif "sas" in config["credential"]: credential = ADLS2SASToken(token=config["credential"]["sas"]) else: credential = ADLS2Key(key=config["credential"]["key"]) return ADLS2Resource(storage_account=storage_account, credential=credential)