You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

DuckDB + PySpark (dagster-duckdb-pyspark)

This library provides an integration with the DuckDB database and PySpark data processing library.

Related guides:

dagster_duckdb_pyspark.DuckDBPySparkIOManager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the DuckDBPySparkIOManager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_duckdb_pyspark import DuckDBPySparkIOManager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb")}
)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the schema.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    # the returned value will be stored at my_schema.my_table
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...
dagster_duckdb_pyspark.duckdb_pyspark_io_manager IOManagerDefinition

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the duckdb_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_duckdb_pyspark import duckdb_pyspark_io_manager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

@repository
def my_repo():
    return with_resources(
        [my_table],
        {"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb"})}
    )

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key. For ops, the schema can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the schema.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    # the returned value will be stored at my_schema.my_table
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...
class dagster_duckdb_pyspark.DuckDBPySparkTypeHandler[source]

Stores PySpark DataFrames in DuckDB.

To use this type handler, return it from the type_handlers` method of an I/O manager that inherits from ``DuckDBIOManager.

Example

from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler

class MyDuckDBIOManager(DuckDBIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DuckDBPySparkTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)