You've added upstream assets to your data pipeline, but nothing downstream - until now. In this step, you'll define a Dagster asset called order_count_chart that uses the data in the customers dbt model to computes a plotly chart of the number of orders per customer.
Like the raw_customers asset that we added in the previous section, we'll put this asset in our definitions.py file, inside the jaffle_dagster directory.
To add the order_count_chart asset:
Replace the imports section with the following:
import os
import duckdb
import pandas as pd
import plotly.express as px
from dagster import MetadataValue, OpExecutionContext, asset
from dagster_dbt import DbtCliResource, dbt_assets, get_asset_key_for_model
from.constants import dbt_manifest_path, dbt_project_dir
After your definition of jaffle_shop_dbt_assets, add the definition for the order_count_chart asset:
@asset(
compute_kind="python",
deps=get_asset_key_for_model([jaffle_shop_dbt_assets],"customers"),)deforder_count_chart(context):# read the contents of the customers table into a Pandas DataFrame
connection = duckdb.connect(os.fspath(duckdb_database_path))
customers = connection.sql("select * from customers").df()# create a plot of number of orders by customer and write it out to an HTML file
fig = px.histogram(customers, x="number_of_orders")
fig.update_layout(bargap=0.2)
save_chart_path = duckdb_database_path.parent.joinpath("order_count_chart.html")
fig.write_html(save_chart_path, auto_open=True)# tell Dagster about the location of the HTML file,# so it's easy to access from the Dagster UI
context.add_output_metadata({"plot_url": MetadataValue.url("file://"+ os.fspath(save_chart_path))})
This asset definition looks similar the asset we defined in the previous section. In this case, instead of fetching data from an external source and writing it to DuckDB, it reads data from DuckDB, and then uses it to make a plot.
The line deps=get_asset_key_for_model([jaffle_shop_dbt_assets], "customers") tells Dagster that this asset is downstream of the customers dbt model. This dependency will be displayed as such in Dagster's UI. If you launch a run to materialize both of them, Dagster won't run order_count_chart until customers completes.
Add the order_count_chart to the Definitions:
import os
from dagster import Definitions
from dagster_dbt import DbtCliResource
from.assets import jaffle_shop_dbt_assets, order_count_chart, raw_customers
from.constants import dbt_project_dir
from.schedules import schedules
defs = Definitions(
assets=[raw_customers, jaffle_shop_dbt_assets, order_count_chart],
schedules=schedules,
resources={"dbt": DbtCliResource(project_dir=os.fspath(dbt_project_dir)),},)
If the Dagster UI is still running from the previous section, click the "Reload Definitions" button in the upper right corner. If you shut it down, then you can launch it again with the same command from the previous section:
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 dagster dev
The UI will look like this:
A new asset named order_count_chart is at the bottom, downstream of the customers asset. Click on order_count_chart and click Materialize selected.
That's it! When the run successfully completes, the following chart will automatically open in your browser:
That's the end of this tutorial - congratulations! By now, you should have a working dbt and Dagster integration and a handful of materialized Dagster assets.