This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import json
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from dagster_airflow.hooks.dagster_hook import DagsterHook
from dagster_airflow.links.dagster_link import LINK_FMT, DagsterLink
from dagster_airflow.utils import is_airflow_2_loaded_in_environment
[docs]class DagsterOperator(BaseOperator):
"""DagsterOperator.
Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure
Parameters:
repository_name (str): the name of the repository to use
repostitory_location_name (str): the name of the repostitory location to use
job_name (str): the name of the job to run
run_config (Optional[Dict[str, Any]]): the run config to use for the job run
dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only
organization_id (Optional[str]): the id of the dagster cloud organization
deployment_name (Optional[str]): the name of the dagster cloud deployment
user_token (Optional[str]): the dagster cloud user token to use
"""
template_fields = ["run_config"]
template_ext = (".yaml", ".yml", ".json")
ui_color = "#663399"
ui_fgcolor = "#e0e3fc"
operator_extra_links = (DagsterLink(),)
@apply_defaults
def __init__(
self,
dagster_conn_id="dagster_default",
run_config=None,
repository_name="",
repostitory_location_name="",
job_name="",
# params for airflow < 2.0.0 were custom connections aren't supported
deployment_name="prod",
user_token=None,
organization_id="",
url="https://dagster.cloud/",
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.run_id = None
self.dagster_conn_id = dagster_conn_id if is_airflow_2_loaded_in_environment() else None
self.run_config = run_config or {}
self.repository_name = repository_name
self.repostitory_location_name = repostitory_location_name
self.job_name = job_name
self.user_token = user_token
self.url = url
self.organization_id = organization_id
self.deployment_name = deployment_name
self.hook = DagsterHook(
dagster_conn_id=self.dagster_conn_id,
user_token=self.user_token,
url=f"{self.url}{self.organization_id}/{self.deployment_name}/graphql",
)
def _is_json(self, blob):
try:
json.loads(blob)
except ValueError:
return False
return True
def pre_execute(self, context):
# force re-rendering to ensure run_config renders any templated
# content from run_config that couldn't be accessed on init
setattr(
self,
"run_config",
self.render_template(self.run_config, context),
)
def on_kill(self):
self.log.info("Terminating Run")
self.hook.terminate_run(
run_id=self.run_id,
)
def execute(self, context):
try:
return self._execute(context)
except Exception as e:
raise e
def _execute(self, context):
self.run_id = self.hook.launch_run(
repository_name=self.repository_name,
repostitory_location_name=self.repostitory_location_name,
job_name=self.job_name,
run_config=self.run_config,
)
# save relevant info in xcom for use in links
context["task_instance"].xcom_push(key="run_id", value=self.run_id)
context["task_instance"].xcom_push(
key="organization_id",
value=self.hook.organization_id if self.dagster_conn_id else self.organization_id,
)
context["task_instance"].xcom_push(
key="deployment_name",
value=self.hook.deployment_name if self.dagster_conn_id else self.deployment_name,
)
self.log.info("Run Starting....")
self.log.info(
"Run tracking: %s",
LINK_FMT.format(
organization_id=self.hook.organization_id,
deployment_name=self.hook.deployment_name,
run_id=self.run_id,
),
)
self.hook.wait_for_run(
run_id=self.run_id,
)
[docs]class DagsterCloudOperator(DagsterOperator):
"""DagsterCloudOperator.
Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud
Parameters:
repository_name (str): the name of the repository to use
repostitory_location_name (str): the name of the repostitory location to use
job_name (str): the name of the job to run
run_config (Optional[Dict[str, Any]]): the run config to use for the job run
dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only
organization_id (Optional[str]): the id of the dagster cloud organization
deployment_name (Optional[str]): the name of the dagster cloud deployment
user_token (Optional[str]): the dagster cloud user token to use
"""