You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Tutorial, part four: Building an asset graph#

In the previous portion of the tutorial, you wrote your first Software-defined asset (SDA), looked at Dagster's UI, and manually materialized your asset.

Continuing from there, you will:

  • Add more assets to your Dagster project
  • Connect them to finish creating the pipeline
  • Give users more knowledge about the assets by adding metadata and logging

Step 1: Adding the DataFrame asset#

Using the top Hacker News story IDs, you'll now look up each story by its ID, ingest that data, and make a DataFrame out of it. You'll connect your current asset with this new asset to establish dependencies and make an asset graph.

Modify your assets.py to add the pandas import and a new asset called topstories:

Add new imports, such as import pandas as pd, to the top of assets.py

import json
import os

import pandas as pd  # Add new imports to the top of `assets.py`
import requests

from dagster import asset

# ... Keep the `topstory_ids` asset from the previous section


@asset(deps=[topstory_ids])  # this asset is dependent on topstory_ids
def topstories() -> None:
    with open("data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            print(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)
    df.to_csv("data/topstories.csv")

Dependencies between assets are defined using the deps parameter of the @asset decorator. In this case, topstory_ids (the list of IDs) is a dependency of topstories (the CSV file).

In your browser, navigate to Dagster's UI (localhost:3000) and look at the asset graph to see the relationship between your assets.

Logging during asset materialization#

In the code above, a print statement is used to show progress while fetching stories from the Hacker News API. Dagster has a built-in logger that extends past print and other Python logging methods. This logger shows exactly where logging happens in your Dagster project, such as which asset and materialization the log came from.

To access the logger, use the get_dagster_logger function. The code below replaces the print statement with the Dagster logger:

from dagster import asset, get_dagster_logger


@asset(deps=[topstory_ids])
def topstories() -> None:
    logger = get_dagster_logger()

    with open("data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)
    df.to_csv("data/topstories.csv")

For more information about loggers, such as the different types of logging, refer to the Logging documentation.


Step 2: Creating an unstructured data asset#

Along with structured data like tables, Dagster's assets can also be unstructured data, such as images. Your next and final asset will take the DataFrame of stories and create a dictionary of the most frequent words in the titles.

Below is the finished code for a most_frequent_words asset. Copy and paste the code into assets.py:

@asset(deps=[topstories])
def most_frequent_words() -> None:
    stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]

    topstories = pd.read_csv("data/topstories.csv")

    # loop through the titles and count the frequency of each word
    word_counts = {}
    for raw_title in topstories["title"]:
        title = raw_title.lower()
        for word in title.split():
            cleaned_word = word.strip(".,-!?:;()[]'\"-")
            if cleaned_word not in stopwords and len(cleaned_word) > 0:
                word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1

    # Get the top 25 most frequent words
    top_words = {
        pair[0]: pair[1]
        for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
    }

    with open("data/most_frequent_words.json", "w") as f:
        json.dump(top_words, f)

Step 3: Educating users with metadata#

Software-defined assets can be enriched with different types of metadata. Anything can be used as metadata for an SDA. Common details to add are:

  • Statistics about the data, such as row counts or other data profiling
  • Test results or assertions about the data
  • Images or tabular previews of the asset
  • Information about who owns the asset, where it's stored, and links to external documentation

The following code adds a row count and a preview of the topstories asset. It uses the context object to attach metadata to the asset. Update your code for the topstories asset to match the changes below. The context is automatically provided by Dagster during asset materialization and contains some useful methods. In this case, we will use the add_output_metadata method.

import base64
from io import BytesIO

import matplotlib.pyplot as plt

from dagster import (
    AssetExecutionContext,
    MetadataValue,
    asset,
    get_dagster_logger,
)

# Add the imports above to the top of `assets.py`


@asset(deps=[topstory_ids])
def topstories(context: AssetExecutionContext) -> None:
    logger = get_dagster_logger()

    with open("data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)
    df.to_csv("data/topstories.csv")

    context.add_output_metadata(
        metadata={
            "num_records": len(df),  # Metadata can be any key-value pair
            "preview": MetadataValue.md(df.head().to_markdown()),
            # The `MetadataValue` class has useful static methods to build Metadata
        }
    )

Reload the definitions and re-materialize your assets. The metadata can then be seen in the following places:

  • In the Asset graph page, click on an asset and its metadata will be shown in the right sidebar:

    the asset's materialization metadata
  • In the Asset Catalog's page for the topstories asset:

    the asset's materialization metadata

Metadata and Markdown#

The DataFrame was embedded into the asset's metadata with Markdown. Any valid Markdown snippet can be stored and rendered in the Dagster UI, including images. By embedding a bar chart of the most frequently used words as metadata, you and your team can visualize and analyze the most_frequent_words asset without leaving the Dagster UI.

Below is code that changes shows how to add an an image of a bar chart in asset metadata. Replace your most_frequent_words asset with the following:

@asset(deps=[topstories])
def most_frequent_words(context: AssetExecutionContext) -> None:
    stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]

    topstories = pd.read_csv("data/topstories.csv")

    # loop through the titles and count the frequency of each word
    word_counts = {}
    for raw_title in topstories["title"]:
        title = raw_title.lower()
        for word in title.split():
            cleaned_word = word.strip(".,-!?:;()[]'\"-")
            if cleaned_word not in stopwords and len(cleaned_word) > 0:
                word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1

    # Get the top 25 most frequent words
    top_words = {
        pair[0]: pair[1]
        for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
    }

    # Make a bar chart of the top 25 words
    plt.figure(figsize=(10, 6))
    plt.bar(top_words.keys(), top_words.values())
    plt.xticks(rotation=45, ha="right")
    plt.title("Top 25 Words in Hacker News Titles")
    plt.tight_layout()

    # Convert the image to a saveable format
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    # Convert the image to Markdown to preview it within Dagster
    md_content = f"![img](data:image/png;base64,{image_data.decode()})"

    with open("data/most_frequent_words.json", "w") as f:
        json.dump(top_words, f)

    # Attach the Markdown content as metadata to the asset
    context.add_output_metadata(metadata={"plot": MetadataValue.md(md_content)})

Reload your definitions and rematerialize your assets. The bar chart will be visible with the rest of your materialization metadata for the most_frequent_words asset. The path key in the metadata will contain a link that says [Show Markdown]. Clicking on the link will open the preview in the Dagster UI. The bar chart will change throughout the day as the top stories change. Here's an example of what most_frequent_words looked like at the time we wrote this tutorial:

the asset's materialization metadata

If your data is sensitive, such as PHI or PII, be careful and follow your organization's policies for surfacing data. You should practice due diligence before showing your data in metadata or logs.


Next steps#

By now, you've:

  • Written and materialized three assets
  • Previewed the data in Dagster's UI
  • Empowered stakeholders and your future self with metadata and logging

In the next section, you'll learn how to schedule your pipeline to automatically update your assets regularly.