You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_pyspark.resources

from typing import Any, Dict

import dagster._check as check
from dagster import ConfigurableResource, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.execution.context.init import InitResourceContext
from dagster_spark.configs_spark import spark_config
from dagster_spark.utils import flatten_dict
from pydantic import PrivateAttr
from pyspark.sql import SparkSession


def spark_session_from_config(spark_conf=None):
    spark_conf = check.opt_dict_param(spark_conf, "spark_conf")
    builder = SparkSession.builder
    flat = flatten_dict(spark_conf)
    for key, value in flat:
        builder = builder.config(key, value)

    return builder.getOrCreate()


class PySparkResource(ConfigurableResource):
    """This resource provides access to a PySpark Session for executing PySpark code within Dagster.

    Example:
        .. code-block:: python

            @op
            def my_op(pyspark: PySparkResource)
                spark_session = pyspark.spark_session
                dataframe = spark_session.read.json("examples/src/main/resources/people.json")


            @job(
                resource_defs={
                    "pyspark": PySparkResource(
                        spark_config={
                            "spark.executor.memory": "2g"
                        }
                    )
                }
            )
            def my_spark_job():
                my_op()
    """

    spark_config: Dict[str, Any]
    _spark_session = PrivateAttr(default=None)

    @classmethod
    def _is_dagster_maintained(cls) -> bool:
        return True

    def setup_for_execution(self, context: InitResourceContext) -> None:
        self._spark_session = spark_session_from_config(self.spark_config)

    @property
    def spark_session(self) -> Any:
        return self._spark_session

    @property
    def spark_context(self) -> Any:
        return self.spark_session.sparkContext


[docs]@dagster_maintained_resource @resource({"spark_conf": spark_config()}) def pyspark_resource(init_context) -> PySparkResource: """This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster. Example: .. code-block:: python @op(required_resource_keys={"pyspark"}) def my_op(context): spark_session = context.resources.pyspark.spark_session dataframe = spark_session.read.json("examples/src/main/resources/people.json") my_pyspark_resource = pyspark_resource.configured( {"spark_conf": {"spark.executor.memory": "2g"}} ) @job(resource_defs={"pyspark": my_pyspark_resource}) def my_spark_job(): my_op() """ context_updated_config = init_context.replace_config( {"spark_config": init_context.resource_config["spark_conf"]} ) return PySparkResource.from_resource_context(context_updated_config)
class LazyPySparkResource(ConfigurableResource): """This resource provides access to a lazily-created PySpark SparkSession for executing PySpark code within Dagster, avoiding the creation of a SparkSession object until the .spark_session attribute of the resource is accessed. This is helpful for avoiding the creation (and startup penalty) of a SparkSession until it is actually needed / accessed by an op or IOManager. Example: .. code-block:: python @op def my_op(lazy_pyspark: LazyPySparkResource) spark_session = lazy_pyspark.spark_session dataframe = spark_session.read.json("examples/src/main/resources/people.json") @job( resource_defs={ "lazy_pyspark": LazyPySparkResource( spark_config={ "spark.executor.memory": "2g" } ) } ) def my_spark_job(): my_op() """ spark_config: Dict[str, Any] _spark_session = PrivateAttr(default=None) @classmethod def _is_dagster_maintained(cls) -> bool: return True def _init_session(self) -> None: if self._spark_session is None: self._spark_session = spark_session_from_config(self.spark_config) @property def spark_session(self) -> Any: self._init_session() return self._spark_session @property def spark_context(self) -> Any: self._init_session() return self._spark_session.sparkContext @dagster_maintained_resource @resource({"spark_conf": spark_config()}) def lazy_pyspark_resource(init_context: InitResourceContext) -> LazyPySparkResource: """This resource provides access to a lazily-created PySpark SparkSession for executing PySpark code within Dagster, avoiding the creation of a SparkSession object until the .spark_session attribute of the resource is accessed. This is helpful for avoiding the creation (and startup penalty) of a SparkSession until it is actually needed / accessed by an op or IOManager. Example: .. code-block:: python @op(required_resource_keys={"lazy_pyspark"}) def my_op(context): spark_session = context.resources.lazy_pyspark.spark_session dataframe = spark_session.read.json("examples/src/main/resources/people.json") my_pyspark_resource = lazy_pyspark_resource.configured( {"spark_conf": {"spark.executor.memory": "2g"}} ) @job(resource_defs={"lazy_pyspark": my_pyspark_resource}) def my_spark_job(): my_op() """ context_updated_config = init_context.replace_config( {"spark_config": init_context.resource_config["spark_conf"]} ) return LazyPySparkResource.from_resource_context(context_updated_config)