Dagster can orchestrate your Airbyte Cloud connections, making it easy to chain an Airbyte sync with upstream or downstream steps in your workflow.
This guide focuses on how to work with Airbyte Cloud connections using Dagster's software-defined asset (SDA) framework.
Airbyte Cloud connections and Dagster software-defined assets#
An Airbyte Cloud connection defines a series of data streams which are synced between a source and a destination. During a sync, a replica of the data from each data stream is written to the destination, typically as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:
Visualize the streams involved in an Airbyte Cloud connection and execute a sync from Dagster
Define downstream computations which depend on replicas produced by Airbyte
Track data lineage through Airbyte and other tools
The first step in using Airbyte Cloud with Dagster is to tell Dagster how to connect to your Airbyte Cloud account using an Airbyte Cloud resource. This resource handles your Airbyte Cloud credentials and any optional configuration.
from dagster import EnvVar
from dagster_airbyte import AirbyteCloudResource
airbyte_instance = AirbyteCloudResource(
api_key=EnvVar("AIRBYTE_API_KEY"),)
Step 2: Building Airbyte Cloud assets using Dagster#
In order to create software-defined assets for your Airbyte Cloud connections, you will first need to determine the connection IDs for each of the connections you would like to build assets for. The connection ID can be seen in the URL of the connection page when viewing the Airbyte Cloud UI, located between /connections/ and /status.
For example, the connection ID for the URL https://cloud.airbyte.com/workspaces/11f3741b-0b54-45f8-9886-937f96f2ba88/connections/43908042-8399-4a58-82f1-71a45099fff7/status is 43908042-8399-4a58-82f1-71a45099fff7.
Then, supply the connection ID and the list of tables which the connection creates in the destination to build_airbyte_assets. This utility will generate a set of software-defined assets corresponding to the tables which Airbyte creates in the destination.
from dagster_airbyte import build_airbyte_assets
airbyte_assets = build_airbyte_assets(
connection_id="43908042-8399-4a58-82f1-71a45099fff7",
destination_tables=["releases","tags","teams"],)
The Airbyte assets constructed using build_airbyte_assets require an Airbyte resource which defines how to connect and interact with your Airbyte Cloud instance.
We can add the Airbyte Cloud resource we configured above to our Airbyte assets by providing it to our Definitions.
Once you have loaded your Airbyte Cloud assets into Dagster, you can create assets which depend on them. These can be other assets pulled in from external sources such as dbt or assets defined in Python code.
In this case, we have an Airbyte Cloud connection that stores data in our Snowflake warehouse's stargazers table. We specify the output I/O manager to tell downstream assets how to retrieve the data.
import json
from dagster import(
AssetSelection,
EnvVar,
Definitions,
asset,
define_asset_job,)from dagster_airbyte import(
build_airbyte_assets,
AirbyteCloudResource,)from dagster_snowflake_pandas import SnowflakePandasIOManager
import pandas as pd
airbyte_instance = AirbyteCloudResource(
api_key=EnvVar("AIRBYTE_API_KEY"),)
airbyte_assets = build_airbyte_assets(
connection_id="43908042-8399-4a58-82f1-71a45099fff7",
destination_tables=["releases","tags","teams"],)@assetdefstargazers_file(stargazers: pd.DataFrame):withopen("stargazers.json","w", encoding="utf8")as f:
f.write(json.dumps(stargazers.to_json(), indent=2))# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job("my_upstream_job",
AssetSelection.keys("stargazers_file").upstream()# all upstream assets (in this case, just the stargazers Airbyte asset).required_multi_asset_neighbors(),# all Airbyte assets linked to the same connection)
defs = Definitions(
jobs=[my_upstream_job],
assets=[airbyte_assets, stargazers_file],
resources={"snowflake_io_manager": SnowflakePandasIOManager(...),"airbyte_instance": airbyte_instance,},)
In this case, we have an Airbyte connection that stores data in the stargazers table in our Snowflake warehouse. Since we are not using an I/O manager to fetch the data in downstream assets, we will use deps to define dependencies. Then within the downstream asset, we can fetch the data if necessary or launch other commands that work with data in external processes.
import json
from dagster import(
AssetKey,
AssetSelection,
EnvVar,
Definitions,
asset,
define_asset_job,)from dagster_airbyte import(
build_airbyte_assets,
AirbyteCloudResource,)from dagster_snowflake import SnowflakeResource
airbyte_instance = AirbyteCloudResource(
api_key=EnvVar("AIRBYTE_API_KEY"),)
airbyte_assets = build_airbyte_assets(
connection_id="43908042-8399-4a58-82f1-71a45099fff7",
destination_tables=["releases","tags","teams"],)@asset(deps=[AssetKey("stargazers")])defstargazers_file(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
stargazers = conn.cursor.execute("SELECT * FROM STARGAZERS").fetch_pandas_all()withopen("stargazers.json","w", encoding="utf8")as f:
f.write(json.dumps(stargazers.to_json(), indent=2))# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job("my_upstream_job",
AssetSelection.keys("stargazers_file").upstream()# all upstream assets (in this case, just the stargazers Airbyte asset).required_multi_asset_neighbors(),# all Airbyte assets linked to the same connection)
defs = Definitions(
jobs=[my_upstream_job],
assets=[airbyte_assets, stargazers_file],
resources={"snowflake": SnowflakeResource(...),"airbyte_instance": airbyte_instance,},)