You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_airflow.resources.airflow_ephemeral_db

import importlib
import os
import tempfile
from typing import List, Optional

import airflow
from airflow.models.connection import Connection
from airflow.utils import db
from dagster import (
    Array,
    DagsterRun,
    Field,
    InitResourceContext,
    Noneable,
    ResourceDefinition,
    _check as check,
)

from dagster_airflow.resources.airflow_db import AirflowDatabase
from dagster_airflow.utils import (
    Locker,
    create_airflow_connections,
    is_airflow_2_loaded_in_environment,
    serialize_connections,
)


class AirflowEphemeralDatabase(AirflowDatabase):
    """A ephemeral Airflow database Dagster resource."""

    def __init__(
        self, airflow_home_path: str, dagster_run: DagsterRun, dag_run_config: Optional[dict] = None
    ):
        self.airflow_home_path = airflow_home_path
        super().__init__(dagster_run=dagster_run, dag_run_config=dag_run_config)

    @staticmethod
    def _initialize_database(
        airflow_home_path: str = os.path.join(tempfile.gettempdir(), "dagster_airflow"),
        connections: List[Connection] = [],
    ):
        os.environ["AIRFLOW_HOME"] = airflow_home_path
        os.makedirs(airflow_home_path, exist_ok=True)
        with Locker(airflow_home_path):
            airflow_initialized = os.path.exists(f"{airflow_home_path}/airflow.db")
            # because AIRFLOW_HOME has been overriden airflow needs to be reloaded
            if is_airflow_2_loaded_in_environment():
                importlib.reload(airflow.configuration)
                importlib.reload(airflow.settings)
                importlib.reload(airflow)
            else:
                importlib.reload(airflow)
            if not airflow_initialized:
                db.initdb()
                create_airflow_connections(connections)

    @staticmethod
    def from_resource_context(context: InitResourceContext) -> "AirflowEphemeralDatabase":
        airflow_home_path = os.path.join(tempfile.gettempdir(), f"dagster_airflow_{context.run_id}")
        AirflowEphemeralDatabase._initialize_database(
            airflow_home_path=airflow_home_path,
            connections=[Connection(**c) for c in context.resource_config["connections"]],
        )
        return AirflowEphemeralDatabase(
            airflow_home_path=airflow_home_path,
            dagster_run=check.not_none(context.dagster_run, "Context must have run"),
            dag_run_config=context.resource_config.get("dag_run_config"),
        )


[docs]def make_ephemeral_airflow_db_resource( connections: List[Connection] = [], dag_run_config: Optional[dict] = None ) -> ResourceDefinition: """Creates a Dagster resource that provides an ephemeral Airflow database. Args: connections (List[Connection]): List of Airflow Connections to be created in the Airflow DB dag_run_config (Optional[dict]): dag_run configuration to be used when creating a DagRun Returns: ResourceDefinition: The ephemeral Airflow DB resource """ serialized_connections = serialize_connections(connections) airflow_db_resource_def = ResourceDefinition( resource_fn=AirflowEphemeralDatabase.from_resource_context, config_schema={ "connections": Field( Array(inner_type=dict), default_value=serialized_connections, is_required=False, ), "dag_run_config": Field( Noneable(dict), default_value=dag_run_config, is_required=False, ), }, description="Ephemeral Airflow DB to be used by dagster-airflow ", ) return airflow_db_resource_def