You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_spark.utils

import itertools
import os

import dagster._check as check

from .types import SparkOpError


def flatten_dict(d):
    def _flatten_dict(d, result, key_path=None):
        """Iterates an arbitrarily nested dictionary and yield dot-notation key:value tuples.

        {'foo': {'bar': 3, 'baz': 1}, {'other': {'key': 1}} =>
            [('foo.bar', 3), ('foo.baz', 1), ('other.key', 1)]

        """
        for k, v in d.items():
            new_key_path = (key_path or []) + [k]
            if isinstance(v, dict):
                _flatten_dict(v, result, new_key_path)
            else:
                result.append((".".join(new_key_path), v))

    result = []
    if d is not None:
        _flatten_dict(d, result)
    return result


def parse_spark_config(spark_conf):
    """Convert spark conf dict to list of CLI arguments.

    For each key-value pair in spark conf, we need to pass to CLI in format:

    --conf "key=value"
    """
    spark_conf_list = flatten_dict(spark_conf)
    return format_for_cli(spark_conf_list)


def format_for_cli(spark_conf_list):
    return list(
        itertools.chain.from_iterable([("--conf", "{}={}".format(*c)) for c in spark_conf_list])
    )


[docs]def construct_spark_shell_command( application_jar, main_class, master_url=None, spark_conf=None, deploy_mode=None, application_arguments=None, spark_home=None, ): """Constructs the spark-submit command for a Spark job.""" check.opt_str_param(master_url, "master_url") check.str_param(application_jar, "application_jar") spark_conf = check.opt_dict_param(spark_conf, "spark_conf") check.opt_str_param(deploy_mode, "deploy_mode") check.opt_str_param(application_arguments, "application_arguments") check.opt_str_param(spark_home, "spark_home") spark_home = spark_home if spark_home else os.environ.get("SPARK_HOME") if spark_home is None: raise SparkOpError( "No spark home set. You must either pass spark_home in config or " "set $SPARK_HOME in your environment (got None)." ) master_url = ["--master", master_url] if master_url else [] deploy_mode = ["--deploy-mode", deploy_mode] if deploy_mode else [] spark_shell_cmd = ( [f"{spark_home}/bin/spark-submit", "--class", main_class] + master_url + deploy_mode + parse_spark_config(spark_conf) + [application_jar] + [application_arguments] ) return spark_shell_cmd