You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_aws.secretsmanager.resources

from contextlib import contextmanager
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, cast

from dagster import (
    Field as LegacyDagsterField,
    resource,
)
from dagster._config.field_utils import Shape
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.test_utils import environ
from dagster._utils.merger import merge_dicts
from pydantic import Field

from dagster_aws.utils import ResourceWithBoto3Configuration

from .secrets import construct_secretsmanager_client, get_secrets_from_arns, get_tagged_secrets

if TYPE_CHECKING:
    import botocore


class SecretsManagerResource(ResourceWithBoto3Configuration):
    """Resource that gives access to AWS SecretsManager.

    The underlying SecretsManager session is created by calling
    :py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`.
    The returned resource object is a SecretsManager client, an instance of `botocore.client.SecretsManager`.

    Example:
        .. code-block:: python

            from dagster import build_op_context, job, op
            from dagster_aws.secretsmanager import SecretsManagerResource

            @op
            def example_secretsmanager_op(secretsmanager: SecretsManagerResource):
                return secretsmanager.get_client().get_secret_value(
                    SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
                )

            @job
            def example_job():
                example_secretsmanager_op()

            defs = Definitions(
                jobs=[example_job],
                resources={
                    'secretsmanager': SecretsManagerResource(
                        region_name='us-west-1'
                    )
                }
            )
    """

    @classmethod
    def _is_dagster_maintained(cls) -> bool:
        return True

    def get_client(self) -> "botocore.client.SecretsManager":
        return construct_secretsmanager_client(
            max_attempts=self.max_attempts,
            region_name=self.region_name,
            profile_name=self.profile_name,
        )


[docs]@dagster_maintained_resource @resource(SecretsManagerResource.to_config_schema()) def secretsmanager_resource(context) -> "botocore.client.SecretsManager": """Resource that gives access to AWS SecretsManager. The underlying SecretsManager session is created by calling :py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`. The returned resource object is a SecretsManager client, an instance of `botocore.client.SecretsManager`. Example: .. code-block:: python from dagster import build_op_context, job, op from dagster_aws.secretsmanager import secretsmanager_resource @op(required_resource_keys={'secretsmanager'}) def example_secretsmanager_op(context): return context.resources.secretsmanager.get_secret_value( SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf' ) @job(resource_defs={'secretsmanager': secretsmanager_resource}) def example_job(): example_secretsmanager_op() example_job.execute_in_process( run_config={ 'resources': { 'secretsmanager': { 'config': { 'region_name': 'us-west-1', } } } } ) You may configure this resource as follows: .. code-block:: YAML resources: secretsmanager: config: region_name: "us-west-1" # Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen # through the ordinary boto credential chain. profile_name: "dev" # Optional[str]: Specifies a custom profile for SecretsManager session. Default is default # profile as specified in ~/.aws/credentials file """ return SecretsManagerResource.from_resource_context(context).get_client()
class SecretsManagerSecretsResource(ResourceWithBoto3Configuration): """Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables. Example: .. code-block:: python import os from dagster import build_op_context, job, op, ResourceParam from dagster_aws.secretsmanager import SecretsManagerSecretsResource @op def example_secretsmanager_secrets_op(secrets: SecretsManagerSecretsResource): return secrets.fetch_secrets().get("my-secret-name") @op def example_secretsmanager_secrets_op_2(secrets: SecretsManagerSecretsResource): with secrets.secrets_in_environment(): return os.getenv("my-other-secret-name") @job def example_job(): example_secretsmanager_secrets_op() example_secretsmanager_secrets_op_2() defs = Definitions( jobs=[example_job], resources={ 'secrets': SecretsManagerSecretsResource( region_name='us-west-1', secrets_tag="dagster", add_to_environment=True, ) } ) Note that your ops must also declare that they require this resource with or it will not be initialized for the execution of their compute functions. """ secrets: List[str] = Field( default=[], description="An array of AWS Secrets Manager secrets arns to fetch." ) secrets_tag: Optional[str] = Field( default=None, description="AWS Secrets Manager secrets with this tag will be fetched and made available.", ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @contextmanager def secrets_in_environment( self, secrets: Optional[List[str]] = None, secrets_tag: Optional[str] = None, ) -> Generator[Dict[str, str], None, None]: """Yields a dict which maps selected SecretsManager secrets to their string values. Also sets chosen secrets as environment variables. Args: secrets (Optional[List[str]]): An array of AWS Secrets Manager secrets arns to fetch. Note that this will override the secrets specified in the resource config. secrets_tag (Optional[str]): AWS Secrets Manager secrets with this tag will be fetched and made available. Note that this will override the secrets_tag specified in the resource config. """ secrets_manager = construct_secretsmanager_client( max_attempts=self.max_attempts, region_name=self.region_name, profile_name=self.profile_name, ) secrets_tag_to_fetch = secrets_tag if secrets_tag is not None else self.secrets_tag secrets_to_fetch = secrets if secrets is not None else self.secrets secret_arns = merge_dicts( ( get_tagged_secrets(secrets_manager, [secrets_tag_to_fetch]) if secrets_tag_to_fetch else {} ), get_secrets_from_arns(secrets_manager, secrets_to_fetch), ) secrets_map = { name: secrets_manager.get_secret_value(SecretId=arn).get("SecretString") for name, arn in secret_arns.items() } with environ(secrets_map): yield secrets_map def fetch_secrets( self, secrets: Optional[List[str]] = None, secrets_tag: Optional[str] = None, ) -> Dict[str, str]: """Fetches secrets from AWS Secrets Manager and returns them as a dict. Args: secrets (Optional[List[str]]): An array of AWS Secrets Manager secrets arns to fetch. Note that this will override the secrets specified in the resource config. secrets_tag (Optional[str]): AWS Secrets Manager secrets with this tag will be fetched and made available. Note that this will override the secrets_tag specified in the resource config. """ with self.secrets_in_environment(secrets=secrets, secrets_tag=secrets_tag) as secret_values: return secret_values LEGACY_SECRETSMANAGER_SECRETS_SCHEMA = { **cast(Shape, SecretsManagerSecretsResource.to_config_schema().as_field().config_type).fields, "add_to_environment": LegacyDagsterField( bool, default_value=False, description="Whether to add the secrets to the environment. Defaults to False.", ), }
[docs]@dagster_maintained_resource @resource(config_schema=LEGACY_SECRETSMANAGER_SECRETS_SCHEMA) @contextmanager def secretsmanager_secrets_resource(context): """Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables. Example: .. code-block:: python import os from dagster import build_op_context, job, op from dagster_aws.secretsmanager import secretsmanager_secrets_resource @op(required_resource_keys={'secrets'}) def example_secretsmanager_secrets_op(context): return context.resources.secrets.get("my-secret-name") @op(required_resource_keys={'secrets'}) def example_secretsmanager_secrets_op_2(context): return os.getenv("my-other-secret-name") @job(resource_defs={'secrets': secretsmanager_secrets_resource}) def example_job(): example_secretsmanager_secrets_op() example_secretsmanager_secrets_op_2() example_job.execute_in_process( run_config={ 'resources': { 'secrets': { 'config': { 'region_name': 'us-west-1', 'secrets_tag': 'dagster', 'add_to_environment': True, } } } } ) Note that your ops must also declare that they require this resource with `required_resource_keys`, or it will not be initialized for the execution of their compute functions. You may configure this resource as follows: .. code-block:: YAML resources: secretsmanager: config: region_name: "us-west-1" # Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen # through the ordinary boto credential chain. profile_name: "dev" # Optional[str]: Specifies a custom profile for SecretsManager session. Default is default # profile as specified in ~/.aws/credentials file secrets: ["arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf"] # Optional[List[str]]: Specifies a list of secret ARNs to pull from SecretsManager. secrets_tag: "dagster" # Optional[str]: Specifies a tag, all secrets which have the tag set will be pulled # from SecretsManager. add_to_environment: true # Optional[bool]: Whether to set the selected secrets as environment variables. Defaults # to false. """ add_to_environment = context.resource_config.get("add_to_environment", False) if add_to_environment: with SecretsManagerSecretsResource.from_resource_context( context ).secrets_in_environment() as secrets: yield secrets else: yield SecretsManagerSecretsResource.from_resource_context(context).fetch_secrets()