Solids, Pipelines, Modes, Presets, Partition Sets, and Composite Solids were a set of Dagster core abstractions that preceded Dagster's current core abstractions: Ops, Jobs, and Graphs. This guide describes how to migrate code that uses the old abstractions to code that uses the present abstractions.
You can also check out the more detailed API reference docs for Jobs, Ops, and Graphs.
Migrating a pipeline to jobs does not require migrating all your other pipelines to jobs. Graphs, jobs, and pipelines can co-exist peacefully in Python.
In Dagit, we display both jobs and pipelines, where pipelines are marked with the Legacy tag in the left nav.
With pipelines, if you wanted to execute a pipeline with resources for a test, you need to include a mode for those resources on the pipeline definition:
from unittest.mock import MagicMock
from dagster import ModeDefinition, ResourceDefinition, execute_pipeline, resource
from dagster._legacy import pipeline, solid
@resourcedefexternal_service():...@solid(required_resource_keys={"external_service"})defdo_something():...@pipeline(
mode_defs=[
ModeDefinition(resource_defs={"external_service": external_service}),
ModeDefinition("test",
resource_defs={"external_service": ResourceDefinition.hardcoded_resource(MagicMock())},),])defdo_it_all():
do_something()deftest_do_it_all():
execute_pipeline(do_it_all, mode="test")
This made it difficult to construct mock resources that were specific to a particular test. It also used string indirection instead of object pointers for referring to the mode (mode="test"), which is error-prone. With the new APIs, you can override the resources to a job at the time you execute:
from unittest.mock import MagicMock
from dagster import job, op, resource
@resourcedefexternal_service():...@op(required_resource_keys={"external_service"})defdo_something():...@job(resource_defs={"external_service": external_service})defdo_it_all():
do_something()deftest_do_it_all():
result = do_it_all.graph.execute_in_process(
resources={"external_service": MagicMock()})assert result.success
It's common for pipelines to have modes corresponding to different environments - e.g. production and development. Here's a pair of resources: one for production and one for development.
from dagster import resource
@resourcedefprod_external_service():...@resourcedefdev_external_service():...
And here's a pipeline that includes these resources inside a pair of modes, along with a repository for loading it from Dagit:
These modes typically correspond to different Dagster Instances. E.g. the pipeline will be executed using the development mode on a local machine and using the production mode from a production instance deployed in the cloud. To point the instances to your repository, you would typically make a workspace yaml that looks something like this:
With this setup, both modes show up on both instances . This is awkward because, even though the production mode shows up on the development instance, it's not meant to be executed there. And vice versa with the dev mode and the production instance.
With the new APIs, the convention is to include development jobs and production jobs in separate repositories. Then the workspace for the production instance can point to the repository with the production jobs, and the workspace for the development instance can point to the development jobs.
So that we don't have to re-write the computation graph that underlies each job, we can use the graph decorator to define the computation graph independently, and then create two different jobs from it using the GraphDefinition.to_job method.
With the pipeline APIs, if you want to specify config definition time (as opposed to at the time you're running the pipeline), you use a PresetDefinition.
from dagster import PresetDefinition
from dagster._legacy import pipeline, solid
@solid(config_schema={"param":str})defdo_something(_):...@pipeline(
preset_defs=[
PresetDefinition("my_preset",
run_config={"solids":{"do_something":{"config":{"param":"some_val"}}}},)])defdo_it_all():
do_something()
With the new APIs, you can accomplish this by supplying config to the job.
from dagster import job, op
@op(config_schema={"param":str})defdo_something(_):...@job(
config={"ops":{"do_something":{"config":{"param":"some_val",}}}})defdo_it_all():
do_something()
Unlike with presets, this config will be used, by default, any time the job is launched, not just when it's launched from the Dagit Launchpad. I.e. it will also be used when launching the job via a schedule, sensor, do_it_all_job.execute_in_process or the GraphQL API. In all these cases, it can instead be overridden with config specified at runtime.
Alternatively, you can provide "partial" config via a ConfigMapping.
With the pipeline APIs, if you want to have the same config available for manual runs of a pipeline and used in a schedule, you need to do something like this:
In order to declare a partitioned schedule using the pipeline-centric APIs, you need to use one of the partitioned schedule decorators, such as daily_schedule or hourly_schedule. The created schedule targets a specific pipeline via pipeline_name.
import datetime
from dagster import daily_schedule, repository
from dagster._legacy import pipeline, solid
@solid(config_schema={"date":str})defdo_something_with_config(context):return context.solid_config["date"]@pipelinedefdo_it_all():
do_something_with_config()@daily_schedule(pipeline_name="do_it_all", start_date=datetime.datetime(2020,1,1))defdo_it_all_schedule(date):return{"solids":{"do_something_with_config":{"config":{"date": date.strftime("%Y-%m-%d %H")}}}}@repositorydefdo_it_all_repo():return[do_it_all, do_it_all_schedule]
With the job APIs, you can first define a graph, and then a job with partitioning. These partitions exist independently of any schedule, and can be used for backfills. A schedule can then be derived from that partitioning, while providing execution time information. The resulting schedule can be independently passed to a repository, and the underlying job will be inferred.
from dagster import InputDefinition, OutputDefinition
from dagster._legacy import solid
@solid(
input_defs=[InputDefinition("arg1", metadata={"a":"b"})],
output_defs=[OutputDefinition(metadata={"c":"d"})],)defdo_something(arg1:str)->int:returnint(arg1)
The op API supports a less verbose way to define inputs and outputs: with the In and Out classes.
from dagster import In, Out, op
@op(ins={"arg1": In(metadata={"a":"b"})}, out=Out(metadata={"c":"d"}))defdo_something(arg1:str)->int:returnint(arg1)
For single outputs, you can just supply an Out object, and for multiple outputs, you can use a dictionary:
from typing import Tuple
from dagster import In, Out, op
@op(
ins={"arg1": In(metadata={"a":"b"})},
out={"out1": Out(metadata={"c":"d"}),"out2": Out(metadata={"e":"f"})},)defdo_something(arg1:str)-> Tuple[int,int]:returnint(arg1),int(arg1)+1
With the pipeline APIs, if you want to pass inputs into and return outputs from a composite solid, you define the definitions of inputs and outputs it maps.
from dagster import OutputDefinition, composite_solid
from dagster._legacy import pipeline, solid
from dagster.core.definitions.inputimport InputDefinition
@soliddefdo_something():pass@soliddefone():return1@solid(
input_defs=[InputDefinition("arg1",int)],
output_defs=[OutputDefinition(int)],)defdo_something_else(arg1):return arg1
@composite_solid(
input_defs=[InputDefinition("arg1",int)],
output_defs=[OutputDefinition(int)],)defdo_two_things(arg1):
do_something()return do_something_else(arg1)@soliddefdo_yet_more(arg1):assert arg1 ==1@pipelinedefdo_it_all():
do_yet_more(do_two_things(one()))
With the graph APIs, you can specify ins and out properties to do so:
from dagster import In, Out, graph, op
from dagster.core.definitions.output import GraphOut
@opdefdo_something():pass@opdefone():return1@op(ins={"arg1": In(int)}, out=Out(int))defdo_something_else(arg1):return arg1
@graph(out=GraphOut())defdo_two_things(arg1):
do_something()return do_something_else(arg1)@opdefdo_yet_more(arg1):assert arg1 ==1@graphdefdo_it_all():
do_yet_more(do_two_things(one()))
Note that graphs no longer require information about the inputs they map to, but they still need infomration about the outputs they map to. Besides, inputs and outputs of a graph now respect the types specified on the underlying Op instead.
With the pipeline APIs, if you want to have multiple outputs from a composite solid, you define the output definitions it maps and return a dictionary, where the keys are the output names and the values are the output values.
from dagster import Output, OutputDefinition, composite_solid
from dagster._legacy import pipeline, solid
@soliddefdo_something():pass@solid(output_defs=[OutputDefinition(int,"one"), OutputDefinition(int,"two")])defreturn_multi():yield Output(1,"one")yield Output(2,"two")@composite_solid(
output_defs=[OutputDefinition(int,"one"), OutputDefinition(int,"two")])defdo_two_things():
do_something()
one, two = return_multi()return{"one": one,"two": two}@soliddefdo_yet_more(arg1, arg2):assert arg1 ==1assert arg2 ==2@pipelinedefdo_it_all():
one, two = do_two_things()
do_yet_more(one, two)
With the graph APIs, you can specify out property:
from dagster import Out, graph, op
from dagster.core.definitions.output import GraphOut
@opdefdo_something():pass@op(out={"one": Out(int),"two": Out(int)})defreturn_multi():return1,2@graph(out={"one": GraphOut(),"two": GraphOut()})defdo_two_things():
do_something()
one, two = return_multi()return(one, two)@opdefdo_yet_more(arg1, arg2):assert arg1 ==1assert arg2 ==2@graphdefdo_it_all():
one, two = do_two_things()
do_yet_more(one, two)
Each mode on a pipeline could have multiple executors, but each job can only have one executor configured. This can be added via the executor_def argument to to_job and @job. Since there is only one executor per job, there is no need to specify the name of the executor in config anymore.
from dagster import job, multiprocess_executor
@job(
executor_def=multiprocess_executor,
config={"execution":{"config":{"max_concurrent":5}}},)defdo_it_all():...
The default executor for a pipeline was the in-process executor. For a job, the default executor can switch between multi-process and in-process configurations. By default, multi-process is enabled. Multi-process and in-process can be switched via config.
from dagster import job
# This job will run with multiprocessing execution@jobdefdo_it_all():...# This job will run with in-process execution@job(config={"execution":{"config":{"in_process":{}}}})defdo_it_all_in_proc():...