Class for sensors. You almost never want to use initialize this class directly. Instead, you should use the @sensor decorator, which returns a SensorDefinition
The decorator used to define a run failure sensor. The run failure sensor, is a special case of a run status sensor specifically to detect run failures.
Class for run status sensors. You almost never want to initialize this class directly. Instead, you should use the @run_status_sensor or @run_failure_sensor
To define a sensor, use the @sensor decorator. The decorated function can optionally have a context as the first argument. The context is a SensorEvaluationContext.
Let's say you have a job that logs a filename that is specified in the op configuration of the process_file op:
You can write a sensor that watches for new files in a specific directory and yields a RunRequest for each new file in the directory. By default, this sensor runs every 30 seconds.
import os
from dagster import sensor, RunRequest, RunConfig
@sensor(job=log_file_job)defmy_directory_sensor():for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)if os.path.isfile(filepath):yield RunRequest(
run_key=filename,
run_config=RunConfig(
ops={"process_file": FileConfig(filename=filename)}),)
This sensor iterates through all the files in MY_DIRECTORY and yields a RunRequest for each file. Note that despite the yield syntax, the function will run to completion before any runs are submitted.
Once a sensor is added to a Definitions object with the job it yields a RunRequest for, it can be started and will start creating runs. You can start or stop sensors in the Dagster UI, or by setting the default status to DefaultSensorStatus.RUNNING in code:
If you manually start or stop a sensor in the UI, that will override any default status that is set in code.
Once your sensor is started, if you're running a Dagster daemon as part of your deployment, the sensor will begin executing immediately without needing to restart the dagster-daemon process.
When instigating runs based on external events, you usually want to run exactly one job run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events:
Dagster guarantees that for a given sensor, at most one run is created for each RunRequest with a unique run_key. If a sensor yields a new run request with a previously used run_key, Dagster skips processing the new run request.
In the example, a RunRequest is requested for each file during every sensor evaluation. Therefore, for a given sensor evaluation, there already exists a RunRequest with a run_key for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file.
Run keys allow you to write sensor evaluation functions that declaratively describe what job runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary.
When writing a sensor that deals with high-volume events, it might not be feasible to yield a RunRequest during every sensor evaluation. For example, you may have an s3 storage bucket that contains thousands of files.
When writing a sensor for such event sources, you can maintain a cursor that limits the number of yielded run requests for previously processed events. The sensor context, provided to every sensor evaluation function, has a cursor property and a update_cursor method for sensors to track state across evaluations:
cursor - A cursor field on SensorEvaluationContext that returns the last persisted cursor value from a previous evaluation
update_cursor - A method on SensorEvaluationContext that takes a string to persist and make available to future evaluations
Here is a somewhat contrived example of our directory file sensor using a cursor for updated files:
@sensor(job=log_file_job)defmy_directory_sensor_cursor(context):
last_mtime =float(context.cursor)if context.cursor else0
max_mtime = last_mtime
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)if os.path.isfile(filepath):
fstats = os.stat(filepath)
file_mtime = fstats.st_mtime
if file_mtime <= last_mtime:continue# the run key should include mtime if we want to kick off new runs based on file modifications
run_key =f"{filename}:{file_mtime}"
run_config ={"ops":{"process_file":{"config":{"filename": filename}}}}yield RunRequest(run_key=run_key, run_config=run_config)
max_mtime =max(max_mtime, file_mtime)
context.update_cursor(str(max_mtime))
For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams.
By default, the Dagster daemon runs a sensor 30 seconds after that sensor's previous evaluation finishes executing. You can configure the interval using the minimum_interval_seconds argument on the @sensor decorator.
It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If you have a sensor that takes 30 seconds to complete, but the minimum_interval_seconds is 5 seconds, the fastest Dagster daemon will run the sensor is every 35 seconds. The minimum_interval_seconds only guarantees that the sensor is not evaluated more frequently than the given interval.
For example, here are two sensors that specify two different minimum intervals:
These sensor definitions are short, so they run in less than a second. Therefore, you can expect these sensors to run consistently around every 30 and 45 seconds, respectively.
If a sensor evaluation function takes more than 60 seconds to return its results, the sensor evaluation will time out and the Dagster daemon will move on to the next sensor without submitting any runs. This 60 second timeout only applies to the time it takes to run the sensor function, not to the execution time of the runs submitted by the sensor. To avoid timeouts, slower sensors can break up their work into chunks, using cursors to let subsequent sensor calls pick up where the previous call left off.
For debugging purposes, it's often useful to describe why a sensor might not yield any runs for a given evaluation. The sensor evaluation function can yield a SkipReason with a string description that will be displayed in the UI.
For example, here is our directory sensor that now provides a SkipReason when no files are encountered:
@sensor(job=log_file_job)defmy_directory_sensor_with_skip_reasons():
has_files =Falsefor filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)if os.path.isfile(filepath):yield RunRequest(
run_key=filename,
run_config={"ops":{"process_file":{"config":{"filename": filename}}}},)
has_files =Trueifnot has_files:yield SkipReason(f"No files found in {MY_DIRECTORY}.")
Dagster's resources system can be used with sensors to make it easier to call out to external systems and to make components of a sensor easier to plug in for testing purposes.
To specify resource dependencies, annotate the resource as a parameter to the sensor's function. Resources are provided by attaching them to your Definitions call.
Here, a resource is provided which provides access to an external API. The same resource could be used in the job that the sensor triggers.
Before you test: Test evaluations of sensors run the sensor's underlying Python function, meaning that any side effects contained within that sensor's function may be executed.
In the UI, you can manually trigger a test evaluation of a sensor and view the results.
Click Overview > Sensors.
Click the sensor you want to test.
Click the Test Sensor button, located near the top right corner of the page.
You'll be prompted to provide a cursor value. You can use the existing cursor for the sensor (which will be prepopulated) or enter a different value. If you're not using cursors, leave this field blank.
Click Evaluate to fire the sensor. A window containing the result of the evaluation will display, whether it's run requests, a skip reason, or a Python error:
If the run was successful, then for each produced run request, you can open the launchpad pre-scaffolded with the config produced by that run request. You'll also see a new computed cursor value from the evaluation, with the option to persist the value.
To unit test sensors, you can directly invoke the sensor's Python function. This will return all the run requests yielded by the sensor. The config obtained from the returned run requests can be validated using the validate_run_config function:
from dagster import validate_run_config
@sensor(job=log_file_job)defsensor_to_test():yield RunRequest(
run_key="foo",
run_config={"ops":{"process_file":{"config":{"filename":"foo"}}}},)deftest_sensor():for run_request in sensor_to_test():assert validate_run_config(log_file_job, run_request.run_config)
Notice that since the context argument wasn't used in the sensor, a context object doesn't have to be provided. However, if the context object is needed, it can be provided via build_sensor_context. Consider again the my_directory_sensor_cursor example:
@sensor(job=log_file_job)defmy_directory_sensor_cursor(context):
last_mtime =float(context.cursor)if context.cursor else0
max_mtime = last_mtime
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)if os.path.isfile(filepath):
fstats = os.stat(filepath)
file_mtime = fstats.st_mtime
if file_mtime <= last_mtime:continue# the run key should include mtime if we want to kick off new runs based on file modifications
run_key =f"{filename}:{file_mtime}"
run_config ={"ops":{"process_file":{"config":{"filename": filename}}}}yield RunRequest(run_key=run_key, run_config=run_config)
max_mtime =max(max_mtime, file_mtime)
context.update_cursor(str(max_mtime))
This sensor uses the context argument. To invoke it, we need to provide one:
from dagster import build_sensor_context
deftest_my_directory_sensor_cursor():
context = build_sensor_context(cursor="0")for run_request in my_directory_sensor_cursor(context):assert validate_run_config(log_file_job, run_request.run_config)
Any sensor can emit log messages during its evaluation function:
@sensor(job=the_job)deflogs_then_skips(context):
context.log.info("Logging from a sensor!")return SkipReason("Nothing to do")
These logs can be viewed in the UI on the corresponding sensor page, after first enabling the Experimental schedule/sensor logging view option in User Settings.
Using the UI, you can monitor and operate sensors. The UI provides multiple views that help with observing sensor evaluations, skip reasons, and errors.
To view all sensors, navigate to Overview > Sensors. Here, you can start and stop sensors, and view their frequency, last tick, and last run:
Click on any sensor to test the sensor, monitor all sensor evaluations on a timeline, and view a table of runs launched by the sensor.
If you want to act on the status of a job run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor with a specified DagsterRunStatus to decorate a function that will run when the given status occurs. This can be used to launch runs of other jobs, send alerts to a monitoring service on run failure, or report a run success.
Here is an example of a run status sensor that launches a run of status_reporting_job if a run is successful:
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=status_reporting_job,)defreport_status_sensor(context):# this condition prevents the sensor from triggering status_reporting_job again after it succeedsif context.dagster_run.job_name != status_reporting_job.name:
run_config ={"ops":{"status_report":{"config":{"job_name": context.dagster_run.job_name}}}}return RunRequest(run_key=None, run_config=run_config)else:return SkipReason("Don't report status of status_reporting_job")
request_job is the job that will be run when the RunRequest is returned.
Note that in report_status_sensor we conditionally return a RunRequest. This ensures that when report_status_sensor runs status_reporting_job it doesn't enter an infinite loop where the success of status_reporting_job triggers another run of status_reporting_job, which triggers another run, and so on.
Here is an example of a sensor that reports job success in a Slack message:
When a run status sensor is triggered by a job run but doesn't return anything, Dagster will report an event back to the run to indicate that the sensor ran.
Once you have written your sensor, you can add the sensor to a Definitions object so it can be enabled and used the same as other sensors:
from dagster import Definitions
defs = Definitions(jobs=[my_sensor_job], sensors=[my_slack_on_run_success])
Dagster provides a set of special run status sensor decorators for defining sensors that monitor run failure events. You can use run_failure_sensor to decorate a function that will run when a run fails:
Sometimes, you may want to monitor jobs in a code location other than the one where the sensor is defined. You can use special identifiers CodeLocationSelector and JobSelector to tell a run status sensor to monitor jobs in another code location:
@run_status_sensor(
monitored_jobs=[CodeLocationSelector(location_name="defs")],
run_status=DagsterRunStatus.SUCCESS,)defcode_location_a_sensor():# when any job in code_location_a succeeds, this sensor will trigger
send_slack_alert()@run_failure_sensor(
monitored_jobs=[
JobSelector(
location_name="defs",
repository_name="code_location_a",
job_name="data_update",)],)defcode_location_a_data_update_failure_sensor():# when the data_update job in code_location_a fails, this sensor will trigger
send_slack_alert()
You can also monitor every job in your Dagster instance by specifying monitor_all_repositories=True on the sensor decorator. Note that monitor_all_repositories cannot be used along with jobs specified via monitored_jobs.
@run_status_sensor(
monitor_all_repositories=True,
run_status=DagsterRunStatus.SUCCESS,)definstance_sensor():# when any job in the Dagster instance succeeds, this sensor will trigger
send_slack_alert()
As with other sensors, you can directly invoke run status sensors. However, the context provided via run_status_sensor and run_failure_sensor contain objects that are typically only available during run time. Below you'll find code snippets that demonstrate how to build the context so that you can directly invoke your function in unit tests.
If you had written a status sensor like this (assuming you implemented the function email_alert elsewhere):
Then we can execute this job and pull the attributes we need to build the context. We provide a function build_run_status_sensor_context that will return the correct context object:
# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_succeeds.execute_in_process(instance=instance)# retrieve the DagsterRun
dagster_run = result.dagster_run
# retrieve a success event from the completed execution
dagster_event = result.get_job_success_event()# create the context
run_status_sensor_context = build_run_status_sensor_context(
sensor_name="my_email_sensor",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,)# run the sensor
my_email_sensor(run_status_sensor_context)
We have provided convenience functions ExecuteInProcessResult.get_job_success_event and ExecuteInProcessResult.get_job_failure_event for retrieving DagsterRunStatus.SUCCESS and DagsterRunStatus.FAILURE events, respectively. If you have a run status sensor triggered on another status, you can retrieve all events from result and filter based on your event type.
We can use the same pattern to build the context for run_failure_sensor. If we wanted to test this run failure sensor:
We first need to make a simple job that will fail:
from dagster import op, job
@opdeffails():raise Exception("failure!")@jobdefmy_job_fails():
fails()
Then we can execute the job and create our context:
from dagster import DagsterInstance, build_run_status_sensor_context
# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_fails.execute_in_process(instance=instance, raise_on_error=False)# retrieve the DagsterRun
dagster_run = result.dagster_run
# retrieve a failure event from the completed job execution
dagster_event = result.get_job_failure_event()# create the context
run_failure_sensor_context = build_run_status_sensor_context(
sensor_name="my_email_failure_sensor",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,).for_run_failure()# run the sensor
my_email_failure_sensor(run_failure_sensor_context)