You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.utils

import keyword
import os
import re
from glob import glob
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, cast

import yaml

import dagster._check as check
import dagster._seven as seven
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.storage.tags import check_reserved_tags
from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls

DEFAULT_OUTPUT = "result"
DEFAULT_GROUP_NAME = "default"  # asset group_name used when none is provided
DEFAULT_IO_MANAGER_KEY = "io_manager"

DISALLOWED_NAMES = set(
    [
        "context",
        "conf",
        "config",
        "meta",
        "arg_dict",
        "dict",
        "input_arg_dict",
        "output_arg_dict",
        "int",
        "str",
        "float",
        "bool",
        "input",
        "output",
        "type",
    ]
    + list(keyword.kwlist)  # just disallow all python keywords
)

VALID_NAME_REGEX_STR = r"^[A-Za-z0-9_]+$"
VALID_NAME_REGEX = re.compile(VALID_NAME_REGEX_STR)


class NoValueSentinel:
    """Sentinel value to distinguish unset from None."""


def has_valid_name_chars(name: str) -> bool:
    return bool(VALID_NAME_REGEX.match(name))


def check_valid_name(name: str, allow_list: Optional[List[str]] = None) -> str:
    check.str_param(name, "name")

    if allow_list and name in allow_list:
        return name

    if name in DISALLOWED_NAMES:
        raise DagsterInvalidDefinitionError(
            f'"{name}" is not a valid name in Dagster. It conflicts with a Dagster or python'
            " reserved keyword."
        )

    check_valid_chars(name)

    check.invariant(is_valid_name(name))
    return name


def check_valid_chars(name: str):
    if not has_valid_name_chars(name):
        raise DagsterInvalidDefinitionError(
            f'"{name}" is not a valid name in Dagster. Names must be in regex'
            f" {VALID_NAME_REGEX_STR}."
        )


def is_valid_name(name: str) -> bool:
    check.str_param(name, "name")

    return name not in DISALLOWED_NAMES and has_valid_name_chars(name)


def _kv_str(key: object, value: object) -> str:
    return f'{key}="{value!r}"'


def struct_to_string(name: str, **kwargs: object) -> str:
    # Sort the kwargs to ensure consistent representations across Python versions
    props_str = ", ".join([_kv_str(key, value) for key, value in sorted(kwargs.items())])
    return f"{name}({props_str})"


def validate_tags(
    tags: Optional[Mapping[str, Any]], allow_reserved_tags: bool = True
) -> Mapping[str, str]:
    valid_tags: Dict[str, str] = {}
    for key, value in check.opt_mapping_param(tags, "tags", key_type=str).items():
        if not isinstance(value, str):
            valid = False
            err_reason = f'Could not JSON encode value "{value}"'
            str_val = None
            try:
                str_val = seven.json.dumps(value)
                err_reason = (
                    'JSON encoding "{json}" of value "{val}" is not equivalent to original value'
                    .format(json=str_val, val=value)
                )

                valid = seven.json.loads(str_val) == value
            except Exception:
                pass

            if not valid:
                raise DagsterInvalidDefinitionError(
                    'Invalid value for tag "{key}", {err_reason}. Tag values must be strings '
                    "or meet the constraint that json.loads(json.dumps(value)) == value.".format(
                        key=key, err_reason=err_reason
                    )
                )

            valid_tags[key] = str_val  # type: ignore  # (possible none)
        else:
            valid_tags[key] = value

    if not allow_reserved_tags:
        check_reserved_tags(valid_tags)

    return valid_tags


def validate_group_name(group_name: Optional[str]) -> str:
    """Ensures a string name is valid and returns a default if no name provided."""
    if group_name:
        check_valid_chars(group_name)
        return group_name
    return DEFAULT_GROUP_NAME


[docs]def config_from_files(config_files: Sequence[str]) -> Mapping[str, Any]: """Constructs run config from YAML files. Args: config_files (List[str]): List of paths or glob patterns for yaml files to load and parse as the run config. Returns: Dict[str, Any]: A run config dictionary constructed from provided YAML files. Raises: FileNotFoundError: When a config file produces no results DagsterInvariantViolationError: When one of the YAML files is invalid and has a parse error. """ config_files = check.opt_sequence_param(config_files, "config_files") filenames = [] for file_glob in config_files or []: globbed_files = glob(file_glob) if not globbed_files: raise DagsterInvariantViolationError( 'File or glob pattern "{file_glob}" for "config_files" produced no results.'.format( file_glob=file_glob ) ) filenames += [os.path.realpath(globbed_file) for globbed_file in globbed_files] try: run_config = merge_yamls(filenames) except yaml.YAMLError as err: raise DagsterInvariantViolationError( f"Encountered error attempting to parse yaml. Parsing files {filenames} " f"loaded by file/patterns {config_files}." ) from err return check.is_dict(cast(Dict[str, object], run_config), key_type=str)
[docs]def config_from_yaml_strings(yaml_strings: Sequence[str]) -> Mapping[str, Any]: """Static constructor for run configs from YAML strings. Args: yaml_strings (List[str]): List of yaml strings to parse as the run config. Returns: Dict[Str, Any]: A run config dictionary constructed from the provided yaml strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ yaml_strings = check.sequence_param(yaml_strings, "yaml_strings", of_type=str) try: run_config = merge_yaml_strings(yaml_strings) except yaml.YAMLError as err: raise DagsterInvariantViolationError( f"Encountered error attempting to parse yaml. Parsing YAMLs {yaml_strings} " ) from err return check.is_dict(cast(Dict[str, object], run_config), key_type=str)
[docs]def config_from_pkg_resources(pkg_resource_defs: Sequence[Tuple[str, str]]) -> Mapping[str, Any]: """Load a run config from a package resource, using :py:func:`pkg_resources.resource_string`. Example: .. code-block:: python config_from_pkg_resources( pkg_resource_defs=[ ('dagster_examples.airline_demo.environments', 'local_base.yaml'), ('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'), ], ) Args: pkg_resource_defs (List[(str, str)]): List of pkg_resource modules/files to load as the run config. Returns: Dict[Str, Any]: A run config dictionary constructed from the provided yaml strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ import pkg_resources # expensive, import only on use pkg_resource_defs = check.sequence_param(pkg_resource_defs, "pkg_resource_defs", of_type=tuple) try: yaml_strings = [ pkg_resources.resource_string(*pkg_resource_def).decode("utf-8") for pkg_resource_def in pkg_resource_defs ] except (ModuleNotFoundError, FileNotFoundError, UnicodeDecodeError) as err: raise DagsterInvariantViolationError( "Encountered error attempting to parse yaml. Loading YAMLs from " f"package resources {pkg_resource_defs}." ) from err return config_from_yaml_strings(yaml_strings=yaml_strings)