You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.mem_io_manager

from typing import Dict, Tuple

from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext
from dagster._core.storage.io_manager import IOManager, dagster_maintained_io_manager, io_manager


[docs]class InMemoryIOManager(IOManager): """I/O manager that stores and retrieves values in memory. After execution is complete, the values will be garbage-collected. Note that this means that each run will not have access to values from previous runs. """ def __init__(self): self.values: Dict[Tuple[object, ...], object] = {} def handle_output(self, context: OutputContext, obj: object): keys = tuple(context.get_identifier()) self.values[keys] = obj def load_input(self, context: InputContext) -> object: keys = tuple(context.get_identifier()) return self.values[keys]
[docs]@dagster_maintained_io_manager @io_manager(description="Built-in IO manager that stores and retrieves values in memory.") def mem_io_manager(_) -> InMemoryIOManager: """Built-in IO manager that stores and retrieves values in memory.""" return InMemoryIOManager()