You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_duckdb.resource

from contextlib import contextmanager

import duckdb
from dagster import ConfigurableResource
from dagster._utils.backoff import backoff
from pydantic import Field


[docs]class DuckDBResource(ConfigurableResource): """Resource for interacting with a DuckDB database. Examples: .. code-block:: python from dagster import Definitions, asset from dagster_duckdb import DuckDBResource @asset def my_table(duckdb: DuckDBResource): with duckdb.get_connection() as conn: conn.execute("SELECT * from MY_SCHEMA.MY_TABLE") defs = Definitions( assets=[my_table], resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")} ) """ database: str = Field( description=( "Path to the DuckDB database. Setting database=':memory:' will use an in-memory" " database " ) ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @contextmanager def get_connection(self): conn = backoff( fn=duckdb.connect, retry_on=(RuntimeError, duckdb.IOException), kwargs={"database": self.database, "read_only": False}, max_retries=10, ) yield conn conn.close()