You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_airflow.operators.dagster_operator

import json

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from dagster_airflow.hooks.dagster_hook import DagsterHook
from dagster_airflow.links.dagster_link import LINK_FMT, DagsterLink
from dagster_airflow.utils import is_airflow_2_loaded_in_environment


[docs]class DagsterOperator(BaseOperator): """DagsterOperator. Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure Parameters: repository_name (str): the name of the repository to use repostitory_location_name (str): the name of the repostitory location to use job_name (str): the name of the job to run run_config (Optional[Dict[str, Any]]): the run config to use for the job run dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only organization_id (Optional[str]): the id of the dagster cloud organization deployment_name (Optional[str]): the name of the dagster cloud deployment user_token (Optional[str]): the dagster cloud user token to use """ template_fields = ["run_config"] template_ext = (".yaml", ".yml", ".json") ui_color = "#663399" ui_fgcolor = "#e0e3fc" operator_extra_links = (DagsterLink(),) @apply_defaults def __init__( self, dagster_conn_id="dagster_default", run_config=None, repository_name="", repostitory_location_name="", job_name="", # params for airflow < 2.0.0 were custom connections aren't supported deployment_name="prod", user_token=None, organization_id="", url="https://dagster.cloud/", *args, **kwargs, ) -> None: super().__init__(*args, **kwargs) self.run_id = None self.dagster_conn_id = dagster_conn_id if is_airflow_2_loaded_in_environment() else None self.run_config = run_config or {} self.repository_name = repository_name self.repostitory_location_name = repostitory_location_name self.job_name = job_name self.user_token = user_token self.url = url self.organization_id = organization_id self.deployment_name = deployment_name self.hook = DagsterHook( dagster_conn_id=self.dagster_conn_id, user_token=self.user_token, url=f"{self.url}{self.organization_id}/{self.deployment_name}/graphql", ) def _is_json(self, blob): try: json.loads(blob) except ValueError: return False return True def pre_execute(self, context): # force re-rendering to ensure run_config renders any templated # content from run_config that couldn't be accessed on init setattr( self, "run_config", self.render_template(self.run_config, context), ) def on_kill(self): self.log.info("Terminating Run") self.hook.terminate_run( run_id=self.run_id, ) def execute(self, context): try: return self._execute(context) except Exception as e: raise e def _execute(self, context): self.run_id = self.hook.launch_run( repository_name=self.repository_name, repostitory_location_name=self.repostitory_location_name, job_name=self.job_name, run_config=self.run_config, ) # save relevant info in xcom for use in links context["task_instance"].xcom_push(key="run_id", value=self.run_id) context["task_instance"].xcom_push( key="organization_id", value=self.hook.organization_id if self.dagster_conn_id else self.organization_id, ) context["task_instance"].xcom_push( key="deployment_name", value=self.hook.deployment_name if self.dagster_conn_id else self.deployment_name, ) self.log.info("Run Starting....") self.log.info( "Run tracking: %s", LINK_FMT.format( organization_id=self.hook.organization_id, deployment_name=self.hook.deployment_name, run_id=self.run_id, ), ) self.hook.wait_for_run( run_id=self.run_id, )
[docs]class DagsterCloudOperator(DagsterOperator): """DagsterCloudOperator. Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud Parameters: repository_name (str): the name of the repository to use repostitory_location_name (str): the name of the repostitory location to use job_name (str): the name of the job to run run_config (Optional[Dict[str, Any]]): the run config to use for the job run dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only organization_id (Optional[str]): the id of the dagster cloud organization deployment_name (Optional[str]): the name of the dagster cloud deployment user_token (Optional[str]): the dagster cloud user token to use """