Fivetran connectors and Dagster software-defined assets#
A Fivetran connector defines a series of schemas and tables which are synced between a source and a destination. During a sync, data is retrieved from the source and written to the destination as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:
Visualize the schemas and tables involved in a Fivetran connector and execute a sync from Dagster
Define downstream computations which depend on replicas produced by Fivetran
Track historical metadata and logs for each table
Track data lineage through Fivetran and other tools
The first step in using Fivetran with Dagster is to tell Dagster how to connect to your Fivetran instance using a Fivetran resource. This resource contains the credentials needed to access your Fivetran instance.
We will supply our credentials as environment variables. For more information on setting environment variables in a production setting, see Using environment variables and secrets.
Then, we can instruct Dagster to authorize the Fivetran resource using the environment variables:
from dagster_fivetran import FivetranResource
from dagster import EnvVar
# Pull API key and secret from environment variables
fivetran_instance = FivetranResource(
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),)
For more information on the additional configuration options available for the Fivetran resource, see the API reference.
Step 2: Loading Fivetran asset definitions into Dagster#
The easiest way to get started using Fivetran with Dagster is to have Dagster automatically generate asset defintions from your Fivetran project. You can load asset definitions from a Fivetran instance via API, at initialization time.
Loading Fivetran asset definitions from a Fivetran instance#
To load Fivetran assets into Dagster from your Fivetran instance, you will need to supply the Fivetran resource that we defined above in step 1. Here, the Fivetran instance is treated as the source of truth.
from dagster_fivetran import load_assets_from_fivetran_instance
# Use the fivetran_instance resource we defined in Step 1
fivetran_assets = load_assets_from_fivetran_instance(fivetran_instance)
The load_assets_from_fivetran_instance function retrieves all of the connectors you have defined in the Fivetran interface, creating software-defined assets for each generated table. Each connector has an associated op which triggers a sync of that connector.
Instead of having Dagster automatically create the asset defintions for your Fivetran instance, you can opt to individually build them. First, determine the Connector IDs for each of the connectors you would like to build assets for. The Connector ID can be found on the Setup page for each connector in the Fivetran UI:
Then, supply the Connector ID and the list of tables which the connector creates in the destination to build_fivetran_assets, each in the format {schema_name}.{table_name}:
from dagster_fivetran import build_fivetran_assets
fivetran_assets = build_fivetran_assets(
connector_id="omit_constitutional",
destination_tables=["public.survey_responses","public.surveys"],)
Manually-built Fivetran assets require a fivetran_resource, which defines how to connect and interact with your Fivetran instance.
We can add the Fivetran resource we configured above to our Fivetran assets by doing the following:
from dagster_fivetran import build_fivetran_assets
from dagster import with_resources
fivetran_assets = with_resources(
build_fivetran_assets(
connector_id="omit_constitutional",
destination_tables=["public.survey_responses","public.surveys"],),# Use the fivetran_instance resource we defined in Step 1{"fivetran": fivetran_instance},)
Once you have loaded your Fivetran assets into Dagster, you can create assets which depend on them. These can be other assets pulled in from external sources such as dbt or assets defined in Python code.
In this case, we have a Fivetran connector which is storing data in the public.survey_responses table in our Snowflake warehouse. The corresponding asset key is public/survey_responses. We specify the output IO manager to tell downstream assets how to retreive the data.
import json
from dagster_fivetran import(
FivetranResource,
load_assets_from_fivetran_instance,)from dagster import(
ScheduleDefinition,
define_asset_job,
asset,
AssetIn,
AssetKey,
Definitions,
AssetSelection,
EnvVar,
Definitions,)from dagster_snowflake_pandas import SnowflakePandasIOManager
fivetran_instance = FivetranResource(
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),)
fivetran_assets = load_assets_from_fivetran_instance(
fivetran_instance,
io_manager_key="snowflake_io_manager",)@asset(
ins={"survey_responses": AssetIn(
key=AssetKey(["public","survey_responses"]))})defsurvey_responses_file(survey_responses):withopen("survey_responses.json","w", encoding="utf8")as f:
f.write(json.dumps(survey_responses, indent=2))# only run the airbyte syncs necessary to materialize survey_responses_file
my_upstream_job = define_asset_job("my_upstream_job",
AssetSelection.keys("survey_responses_file").upstream()# all upstream assets (in this case, just the survey_responses Fivetran asset).required_multi_asset_neighbors(),# all Fivetran assets linked to the same connection)
defs = Definitions(
jobs=[my_upstream_job],
assets=[fivetran_assets, survey_responses_file],
resources={"snowflake_io_manager": SnowflakePandasIOManager(...)},)