You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

IO Managers
Legacy
#

This guide covers using the legacy Dagster resource system. For docs on the new Pythonic resource system introduced in Dagster 1.3, see the updated I/O managers guide. To migrate your code, refer to the migrating to Pythonic resources and config guide.

IO Managers are user-provided objects that store asset and op outputs and load them as inputs to downstream assets and ops.

Diagram of assets and ops with an I/O manager

Relevant APIs#

NameDescription
@io_managerA decorator used to define IO managers.
IOManagerBase class for user-provided IO managers.
build_input_contextFunction for directly constructing a InputContext, to be passed to the IOManager.load_input method. This is designed primarily for testing purposes.
build_output_contextFunction for directly constructing a OutputContext, to be passed to the IOManager.handle_output method. This is designed primarily for testing purposes.

Overview#

Functions decorated by @asset, @multi_asset, and @op can have parameters and return values that are loaded from and written to persistent storage. IOManagers let the user control how this data is stored and how it's loaded in downstream ops and assets. For @asset and @multi_asset, the IO manager effectively determines where the physical asset lives.

The IO manager APIs make it easy to separate code that's responsible for logical data transformation from code that's responsible for reading and writing the results. Software-defined assets and ops can focus on business logic, while IO managers handle I/O. This separation makes it easier to test the business logic and run it in different environments.

For non-asset jobs with inputs that aren't connected to upstream outputs, see the Unconnected Inputs overview.

Outputs and downstream inputs#

IOManagers are user-provided objects that are responsible for storing the output of an asset or op and loading it as input to downstream assets or ops. For example, an IO manager might store and load objects from files on a filesystem.

Each software-defined asset can have its own IO manager. In the multi-asset case where multiple assets are outputted, each outputted asset can be handled with a different IO manager:

Multi-asset with each outputted asset being handled by a different I/O manager

For ops, each op output can have its own IO manager, or multiple op outputs can share an IO manager. The IO manager that's used for handling a particular op output is automatically used for loading it in downstream ops.

Consider the following diagram. In this example, a job has two IO managers, each of which is shared across a few inputs and outputs:

Built-in IO managers#

The default IO manager, fs_io_manager, stores and retrieves values from pickle files in the local filesystem. If a job is invoked via JobDefinition.execute_in_process, the default IO manager is switched to mem_io_manager, which stores outputs in memory.

Dagster provides out-of-the-box IO managers for popular storage systems: AWS S3 (s3_pickle_io_manager), Azure Blob Storage (adls2_pickle_io_manager), GCS (gcs_pickle_io_manager), and Snowflake (build_snowflake_io_manager) - or you can write your own: either from scratch or by extending the UPathIOManager if you want to store data in an fsspec-supported filesystem.

IO managers are resources#

IO managers are resources, which means users can supply different IO managers for the same assets or ops in different situations. For example, you might use an in-memory IO manager for unit-testing and an S3 IO manager in production.


Using IO managers with software-defined assets#

Applying IO managers to assets#

By default, materializing an asset named my_asset will pickle it to a local file named my_asset. The directory that file lives underneath is determined by the rules in fs_io_manager.

IO managers enable fully overriding this behavior and storing asset contents in any way you wish - e.g. writing them as tables in a database or as objects in a cloud object store such as s3. You can use one of Dagster's built-in IO managers, or you can write your own.

To apply an IO manager to a set of assets, you can use Definitions:

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Definitions, asset


@asset
def upstream_asset():
    return [1, 2, 3]


@asset
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


defs = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

This example also includes "s3": s3_resource because the s3_pickle_io_manager depends on an S3 resource.

When upstream_asset is materialized, the value [1, 2, 3] will be pickled and stored in an object on S3. When downstream_asset is materialized, the value of upstream_asset will be read from S3 and depickled, and [1, 2, 3, 4] will be pickled and stored in a different object on S3.

Per-asset IO manager#

Different assets can have different IO managers:

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Definitions, asset, fs_io_manager


@asset(io_manager_key="s3_io_manager")
def upstream_asset():
    return [1, 2, 3]


@asset(io_manager_key="fs_io_manager")
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


defs = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={
        "s3_io_manager": s3_pickle_io_manager,
        "s3": s3_resource,
        "fs_io_manager": fs_io_manager,
    },
)

When upstream_asset is materialized, the value [1, 2, 3] will be pickled and stored in an object on S3. When downstream_asset is materialized, the value of upstream_asset will be read from S3 and depickled, and [1, 2, 3, 4] will be pickled and stored in a file on the local filesystem.

In the multi-asset case, you can customize how each asset is materialized by specifying an io_manager_key on each output of the multi-asset.

from dagster import AssetOut, multi_asset


@multi_asset(
    outs={
        "s3_asset": AssetOut(io_manager_key="s3_io_manager"),
        "adls_asset": AssetOut(io_manager_key="adls2_io_manager"),
    },
)
def my_assets():
    return "store_me_on_s3", "store_me_on_adls2"

The same assets can be bound to different resources and IO managers in different environments. For example, for local development, you might want to store assets on your local filesystem while in production, you might want to store the assets in S3.

import os

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Definitions, asset, fs_io_manager


@asset
def upstream_asset():
    return [1, 2, 3]


@asset
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


resources_by_env = {
    "prod": {"io_manager": s3_pickle_io_manager, "s3": s3_resource},
    "local": {"io_manager": fs_io_manager},
}

defs = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources=resources_by_env[os.getenv("ENV", "local")],
)

Asset input IO managers#

In some cases you may need to load the input to an asset with different logic than that specified by the upstream asset's IO manager.

To set an IO manager for a particular input, use the input_manager_key argument on AssetIn.

In this example,first_asset and second_asset will be stored using the default IO manager, but will be loaded as inputs to third_asset using the logic defined in the pandas_series_io_manager (in this case loading as Pandas Series rather than python lists).

@asset
def first_asset():
    return [1, 2, 3]


@asset
def second_asset():
    return [4, 5, 6]


@asset(
    ins={
        "first_asset": AssetIn(input_manager_key="pandas_series"),
        "second_asset": AssetIn(input_manager_key="pandas_series"),
    }
)
def third_asset(first_asset, second_asset):
    return pd.concat([first_asset, second_asset, pd.Series([7, 8])])


defs = Definitions(
    assets=[first_asset, second_asset, third_asset],
    resources={
        "pandas_series": pandas_series_io_manager,
    },
)

Using IO managers with non-asset jobs#

Job-wide IO manager#

By default, all the inputs and outputs in a job use the same IO manager. This IO manager is determined by the ResourceDefinition provided for the "io_manager" resource key. "io_manager" is a resource key that Dagster reserves specifically for this purpose.

Here’s how to specify that all op outputs are stored using the fs_io_manager, which pickles outputs and stores them on the local filesystem. It stores files in a directory with the run ID in the path, so that outputs from prior runs will never be overwritten.

from dagster import fs_io_manager, job, op


@op
def op_1():
    return 1


@op
def op_2(a):
    return a + 1


@job(resource_defs={"io_manager": fs_io_manager})
def my_job():
    op_2(op_1())

Per-output IO manager#

Not all the outputs in a job should necessarily be stored the same way. Maybe some of the outputs should live on the filesystem so they can be inspected, and others can be transiently stored in memory.

To select the IO manager for a particular output, you can set an io_manager_key on Out, and then refer to that io_manager_key when setting IO managers in your job. In this example, the output of op_1 will go to fs_io_manager and the output of op_2 will go to s3_pickle_io_manager.

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Out, fs_io_manager, job, op


@op(out=Out(io_manager_key="fs"))
def op_1():
    return 1


@op(out=Out(io_manager_key="s3_io"))
def op_2(a):
    return a + 1


@job(
    resource_defs={
        "fs": fs_io_manager,
        "s3_io": s3_pickle_io_manager,
        "s3": s3_resource,
    }
)
def my_job():
    op_2(op_1())

Per-input IO manager#

Just as with the inputs to assets, the inputs to ops can be loaded using custom logic if you want to override the IO manager of the upstream output. To set an IO manager for a particular input, use the input_manager_key argument on In.

In this example, the output of op_1 will be stored using the default IO manager, but will be loaded in op_2 using the logic defined in the pandas_series_io_manager (in this case loading as Pandas Series rather than python lists).

@op
def op_1():
    return [1, 2, 3]


@op(ins={"a": In(input_manager_key="pandas_series")})
def op_2(a):
    return pd.concat([a, pd.Series([4, 5, 6])])


@job(resource_defs={"pandas_series": pd_series_io_manager})
def a_job():
    op_2(op_1())

Defining an IO manager#

If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own IO manager. This boils down to implementing two functions: one that stores outputs and one that loads inputs.

To define an IO manager, use the @io_manager decorator.

from dagster import IOManager, io_manager


class MyIOManager(IOManager):
    def _get_path(self, context) -> str:
        return "/".join(context.asset_key.path)

    def handle_output(self, context, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context):
        return read_csv(self._get_path(context))


@io_manager
def my_io_manager():
    return MyIOManager()

The io_manager decorator behaves nearly identically to the resource decorator. It yields an IOManagerDefinition, which is a subclass of ResourceDefinition that will produce an IOManager.

The provided context argument for handle_output is an OutputContext. The provided context argument for load_input is an InputContext. The linked API documentation lists all the fields that are available on these objects.

Handling partitioned assets#

IO managers can be written to handle partitioned assets. For a partitioned asset, each invocation of handle_output will (over)write a single partition, and each invocation of load_input will load one or more partitions. When the IO manager is backed by a filesystem or object store, then each partition will typically correspond to a file or object. When it's backed by a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.

The default IO Manager has support for loading a partitioned upstream asset for a downstream asset with matching partitions out of the box (see the section below for loading multiple partitions). The UPathIOManager can be used to handle partitions in custom filesystem-based IO Managers.

To handle partitions in an custom IO manager, you'll need to determine which partition you're dealing with when you're storing an output or loading an input. For this, OutputContext and InputContext have a asset_partition_key property:

class MyPartitionedIOManager(IOManager):
    def _get_path(self, context) -> str:
        if context.has_partition_key:
            return "/".join(context.asset_key.path + [context.asset_partition_key])
        else:
            return "/".join(context.asset_key.path)

    def handle_output(self, context, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context):
        return read_csv(self._get_path(context))

If you're working with time window partitions, you can also use the asset_partitions_time_window property, which will return a TimeWindow object.

Handling partition mappings#

A single partition of one asset might depend on a range of partitions of an upstream asset.

The default IO Manager has support for loading multiple upstream partitions. In this case, the downstream asset should use Dict[str, ...] (or leave it blank) type for the upstream DagsterType. Here is an example of loading multiple upstream partitions using the default partition mapping:

from datetime import datetime
from typing import Dict

import pandas as pd

from dagster import (
    DailyPartitionsDefinition,
    HourlyPartitionsDefinition,
    OpExecutionContext,
    asset,
    materialize,
)

start = datetime(2022, 1, 1)

hourly_partitions = HourlyPartitionsDefinition(start_date=f"{start:%Y-%m-%d-%H:%M}")
daily_partitions = DailyPartitionsDefinition(start_date=f"{start:%Y-%m-%d}")


@asset(partitions_def=hourly_partitions)
def upstream_asset(context: OpExecutionContext) -> pd.DataFrame:
    return pd.DataFrame({"date": [context.partition_key]})


@asset(
    partitions_def=daily_partitions,
)
def downstream_asset(upstream_asset: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    return pd.concat(list(upstream_asset.values()))


result = materialize(
    [*upstream_asset.to_source_assets(), downstream_asset],
    partition_key=start.strftime(daily_partitions.fmt),
)
downstream_asset_data = result.output_for_node("downstream_asset", "result")
assert (
    len(downstream_asset_data) == 24
), "downstream day should map to upstream 24 hours"

The upstream_asset becomes a mapping from partition keys to partition values. This is a property of the default IO manager or any IO manager inheriting from the UPathIOManager.

A PartitionMapping can be provided to AssetIn to configure the mapped upstream partitions.

When writing a custom IO Manager for loading multiple upstream partitions, the mapped keys can be accessed using InputContext.asset_partition_keys, InputContext.asset_partition_key_range, or InputContext.asset_partitions_time_window.

Writing a per-input IO Manager#

In some cases you may find that you need to load an input in a way other than the load_input function of the corresponding output's IO manager. For example, let's say Team A has an op that returns an output as a Pandas DataFrame and specifies an IO manager that knows how to store and load Pandas DataFrames. Your team is interested in using this output for a new op, but you are required to use PySpark to analyze the data. Unfortunately, you don't have permission to modify Team A's IO manager to support this case. Instead, you can specify an input manager on your op that will override some of the behavior of Team A's IO manager.

Since the method for loading an input is directly affected by the way the corresponding output was stored, we recommend defining your input managers as subclasses of existing IO managers and just updating the load_input method. In this example, we load an input as a NumPy array rather than a Pandas DataFrame by writing the following:

# in this case PandasIOManager is an existing IO Manager
class MyNumpyLoader(PandasIOManager):
    def load_input(self, context):
        file_path = "path/to/dataframe"
        array = np.genfromtxt(file_path, delimiter=",", dtype=None)
        return array


@io_manager
def numpy_io_manager():
    return MyNumpyLoader()


@op(ins={"np_array_input": In(input_manager_key="numpy_manager")})
def analyze_as_numpy(np_array_input: np.ndarray):
    assert isinstance(np_array_input, np.ndarray)


@job(resource_defs={"numpy_manager": numpy_io_manager, "io_manager": pandas_io_manager})
def my_job():
    df = produce_pandas_output()
    analyze_as_numpy(df)

This may quickly run into issues if the owner of PandasIOManager changes the path at which they store outputs. We recommend splitting out path defining logic (or other computations shared by handle_output and load_input) into new methods that are called when needed.

# this IO Manager is owned by a different team
class BetterPandasIOManager(IOManager):
    def _get_path(self, output_context):
        return os.path.join(
            self.base_dir,
            "storage",
            f"{output_context.step_key}_{output_context.name}.csv",
        )

    def handle_output(self, context, obj: pd.DataFrame):
        file_path = self._get_path(context)
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        if obj is not None:
            obj.to_csv(file_path, index=False)

    def load_input(self, context) -> pd.DataFrame:
        return pd.read_csv(self._get_path(context.upstream_output))


# write a subclass that uses _get_path for your custom loading logic
class MyBetterNumpyLoader(PandasIOManager):
    def load_input(self, context):
        file_path = self._get_path(context.upstream_output)
        array = np.genfromtxt(file_path, delimiter=",", dtype=None)
        return array


@io_manager
def better_numpy_io_manager():
    return MyBetterNumpyLoader()


@op(ins={"np_array_input": In(input_manager_key="better_numpy_manager")})
def better_analyze_as_numpy(np_array_input: np.ndarray):
    assert isinstance(np_array_input, np.ndarray)


@job(
    resource_defs={
        "numpy_manager": better_numpy_io_manager,
        "io_manager": pandas_io_manager,
    }
)
def my_better_job():
    df = produce_pandas_output()
    better_analyze_as_numpy(df)

Examples#

A custom IO manager that stores Pandas DataFrames in tables#

If your ops produce Pandas DataFrames that populate tables in a data warehouse, you might write something like the following. This IO manager uses the name assigned to the output as the name of the table to write the output to.

from dagster import IOManager, io_manager


class DataframeTableIOManager(IOManager):
    def handle_output(self, context, obj):
        # name is the name given to the Out that we're storing for
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        # upstream_output.name is the name given to the Out that we're loading for
        table_name = context.upstream_output.name
        return read_dataframe_from_table(name=table_name)


@io_manager
def df_table_io_manager():
    return DataframeTableIOManager()


@job(resource_defs={"io_manager": df_table_io_manager})
def my_job():
    op_2(op_1())

Custom filesystem-based IO Manager#

Dagster provides a feature-rich base class for filesystem-based IO Managers: UPathIOManager. It's compatible with both local and remote filesystems (like S3 or GCS) by using universal-pathlib and fsspec. The full list of supported filesystems can be found here. The UPathIOManager also has other important features:

  • handles partitioned assets
  • handles loading a single upstream partition
  • handles loading multiple upstream partitions (with respect to PartitionMapping)
  • the get_metadata method can be customized to add additional metadata to the output
  • the allow_missing_partitions metadata value can be set to True to skip missing partitions (the default behavior is to raise an error)

The default IO manager inherits from the UPathIOManager and therefore has these features too.

The UPathIOManager already implements the load_input and handle_output methods. Instead, UPathIOManager.dump_to_path and UPathIOManager.load_from_path for a given universal_pathlib.UPath have to be implemented. Here are some examples:

import pandas as pd
from upath import UPath

from dagster import (
    Field,
    InitResourceContext,
    InputContext,
    OutputContext,
    StringSource,
    UPathIOManager,
    io_manager,
)


class PandasParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.to_parquet(file)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        with path.open("rb") as file:
            return pd.read_parquet(file)

The extension attribute defines the suffix all the file paths generated by the IOManager will end with.

The io managers defined above will work with partitioned assets on any filesystem:

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
    init_context: InitResourceContext,
) -> PandasParquetIOManager:
    assert init_context.instance is not None  # to please mypy
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return PandasParquetIOManager(base_path=base_path)


@io_manager(
    config_schema={
        "base_path": Field(str, is_required=True),
        "AWS_ACCESS_KEY_ID": StringSource,
        "AWS_SECRET_ACCESS_KEY": StringSource,
    }
)
def s3_parquet_io_manager(init_context: InitResourceContext) -> PandasParquetIOManager:
    # `UPath` will read boto env vars.
    # The credentials can also be taken from the config and passed to `UPath` directly.
    base_path = UPath(init_context.resource_config.get("base_path"))
    assert str(base_path).startswith("s3://"), base_path
    return PandasParquetIOManager(base_path=base_path)

Notice how the local and S3 IO managers are practically the same - the only difference is in the required resources.

Providing per-output config to an IO manager#

When launching a run, you might want to parameterize how particular outputs are stored.

For example, if your job produces DataFrames to populate tables in a data warehouse, you might want to specify the table that each output goes to at run launch time.

To accomplish this, you can define an output_config_schema on the IO manager definition. The IO manager methods can access this config when storing or loading data, via the OutputContext.

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.config["table"]
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.config["table"]
        return read_dataframe_from_table(name=table_name)


@io_manager(output_config_schema={"table": str})
def my_io_manager(_):
    return MyIOManager()

Then, when executing a job, you can pass in this per-output config.

def execute_my_job_with_config():
    @job(resource_defs={"io_manager": my_io_manager})
    def my_job():
        op_2(op_1())

    my_job.execute_in_process(
        run_config={
            "ops": {
                "op_1": {"outputs": {"result": {"table": "table1"}}},
                "op_2": {"outputs": {"result": {"table": "table2"}}},
            }
        },
    )

Providing per-output metadata to an IO manager#

You might want to provide static metadata that controls how particular outputs are stored. You don't plan to change the metadata at runtime, so it makes more sense to attach it to a definition rather than expose it as a configuration option.

For example, if your job produces DataFrames to populate tables in a data warehouse, you might want to specify that each output always goes to a particular table. To accomplish this, you can define metadata on each Out:

@op(out=Out(metadata={"schema": "some_schema", "table": "some_table"}))
def op_1():
    """Return a Pandas DataFrame."""


@op(out=Out(metadata={"schema": "other_schema", "table": "other_table"}))
def op_2(_input_dataframe):
    """Return a Pandas DataFrame."""

The IO manager can then access this metadata when storing or retrieving data, via the OutputContext.

In this case, the table names are encoded in the job definition. If, instead, you want to be able to set them at run time, the next section describes how.

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.metadata["table"]
        schema = context.metadata["schema"]
        write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.metadata["table"]
        schema = context.upstream_output.metadata["schema"]
        return read_dataframe_from_table(name=table_name, schema=schema)


@io_manager
def my_io_manager(_):
    return MyIOManager()

Per-input loading in assets#

Let's say you have an asset that is set to store and load as a Pandas DataFrame, but you want to write a new asset that processes the first asset as a NumPy array. Rather than update the IO manager of the first asset to be able to load as a Pandas DataFrame and a NumPy array, you can write a new loader for the new asset.

In this example, we store upstream_asset as a Pandas DataFrame, and we write a new IO manager to load is as a NumPy array in downstream_asset

class PandasAssetIOManager(IOManager):
    def handle_output(self, context, obj):
        file_path = self._get_path(context)
        store_pandas_dataframe(name=file_path, table=obj)

    def _get_path(self, context):
        return os.path.join(
            "storage",
            f"{context.asset_key.path[-1]}.csv",
        )

    def load_input(self, context):
        file_path = self._get_path(context)
        return load_pandas_dataframe(name=file_path)


@io_manager
def pandas_asset_io_manager():
    return PandasAssetIOManager()


class NumpyAssetIOManager(PandasAssetIOManager):
    def load_input(self, context):
        file_path = self._get_path(context)
        return load_numpy_array(name=file_path)


@io_manager
def numpy_asset_io_manager():
    return NumpyAssetIOManager()


@asset(io_manager_key="pandas_manager")
def upstream_asset():
    return pd.DataFrame([1, 2, 3])


@asset(
    ins={"upstream": AssetIn(key_prefix="public", input_manager_key="numpy_manager")}
)
def downstream_asset(upstream):
    return upstream.shape


defs = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={
        "pandas_manager": pandas_asset_io_manager,
        "numpy_manager": numpy_asset_io_manager,
    },
)

Testing an IO manager#

The easiest way to test an IO manager is to construct an OutputContext or InputContext and pass it to the handle_output or load_input method of the IO manager. The build_output_context and build_input_context functions allow for easy construction of these contexts.

Here's an example for a simple IO manager that stores outputs in an in-memory dictionary that's keyed on the step and name of the output.

from dagster import IOManager, build_input_context, build_output_context, io_manager


class MyIOManager(IOManager):
    def __init__(self):
        self.storage_dict = {}

    def handle_output(self, context, obj):
        self.storage_dict[(context.step_key, context.name)] = obj

    def load_input(self, context):
        return self.storage_dict[
            (context.upstream_output.step_key, context.upstream_output.name)
        ]


@io_manager
def my_io_manager(_):
    return MyIOManager()


def test_my_io_manager_handle_output():
    manager = my_io_manager(None)
    context = build_output_context(name="abc", step_key="123")
    manager.handle_output(context, 5)
    assert manager.storage_dict[("123", "abc")] == 5


def test_my_io_manager_load_input():
    manager = my_io_manager(None)
    manager.storage_dict[("123", "abc")] = 5

    context = build_input_context(
        upstream_output=build_output_context(name="abc", step_key="123")
    )
    assert manager.load_input(context) == 5

Recording metadata from an IO Manager#

Sometimes, you may want to record some metadata while handling an output in an IO manager. To do this, you can invoke OutputContext.add_output_metadata from within the body of the handle_output function. Using this, we can modify one of the above examples to now include some helpful metadata in the log:

class DataframeTableIOManagerWithMetadata(IOManager):
    def handle_output(self, context, obj):
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

        context.add_output_metadata({"num_rows": len(obj), "table_name": table_name})

    def load_input(self, context):
        table_name = context.upstream_output.name
        return read_dataframe_from_table(name=table_name)

Any entries yielded this way will be attached to the Handled Output event for this output.

Additionally, if the handled output is part of a software-defined asset, these metadata entries will also be attached to the materialization event created for that asset and show up on the Asset Details page for the asset.

See it in action#

For more examples of IO Managers, check out the following in our Hacker News example:

Our Type and Metadata example also covers writing custom IO managers.