You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagstermill.io_managers

import os
from pathlib import Path
from typing import Any, List, Optional, Sequence

import dagster._check as check
from dagster import (
    AssetKey,
    AssetMaterialization,
    ConfigurableIOManagerFactory,
    InitResourceContext,
    IOManager,
)
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext
from dagster._core.storage.io_manager import dagster_maintained_io_manager, io_manager
from dagster._utils import mkdir_p
from pydantic import Field

from dagstermill.factory import _clean_path_for_windows


class OutputNotebookIOManager(IOManager):
    def __init__(self, asset_key_prefix: Optional[Sequence[str]] = None):
        self.asset_key_prefix = asset_key_prefix if asset_key_prefix else []

    def handle_output(self, context: OutputContext, obj: bytes):
        raise NotImplementedError

    def load_input(self, context: InputContext) -> Any:
        raise NotImplementedError


class LocalOutputNotebookIOManager(OutputNotebookIOManager):
    def __init__(self, base_dir: str, asset_key_prefix: Optional[Sequence[str]] = None):
        super(LocalOutputNotebookIOManager, self).__init__(asset_key_prefix=asset_key_prefix)
        self.base_dir = base_dir
        self.write_mode = "wb"
        self.read_mode = "rb"

    def _get_path(self, context: OutputContext) -> str:
        """Automatically construct filepath."""
        if context.has_asset_key:
            keys = context.get_asset_identifier()
        else:
            keys = context.get_run_scoped_output_identifier()
        return str(Path(self.base_dir, *keys).with_suffix(".ipynb"))

    def handle_output(self, context: OutputContext, obj: bytes):
        """obj: bytes."""
        check.inst_param(context, "context", OutputContext)

        # the output notebook itself is stored at output_file_path
        output_notebook_path = self._get_path(context)
        mkdir_p(os.path.dirname(output_notebook_path))
        with open(output_notebook_path, self.write_mode) as dest_file_obj:
            dest_file_obj.write(obj)

        metadata = {
            "Executed notebook": MetadataValue.notebook(
                _clean_path_for_windows(output_notebook_path)
            )
        }

        if context.has_asset_key:
            context.add_output_metadata(metadata)
        else:
            context.log_event(
                AssetMaterialization(
                    asset_key=AssetKey(
                        [*self.asset_key_prefix, f"{context.step_key}_output_notebook"]
                    ),
                    metadata=metadata,
                )
            )

    def load_input(self, context: InputContext) -> bytes:
        check.inst_param(context, "context", InputContext)
        # pass output notebook to downstream ops as File Object
        output_context = check.not_none(context.upstream_output)
        with open(self._get_path(output_context), self.read_mode) as file_obj:
            return file_obj.read()


[docs]class ConfigurableLocalOutputNotebookIOManager(ConfigurableIOManagerFactory): """Built-in IO Manager for handling output notebook.""" base_dir: Optional[str] = Field( default=None, description=( "Base directory to use for output notebooks. Defaults to the Dagster instance storage" " directory if not provided." ), ) asset_key_prefix: List[str] = Field( default=[], description=( "Asset key prefix to apply to assets materialized for output notebooks. Defaults to no" " prefix." ), ) @classmethod def _is_dagster_maintained(cls) -> bool: return True def create_io_manager(self, context: InitResourceContext) -> "LocalOutputNotebookIOManager": return LocalOutputNotebookIOManager( base_dir=self.base_dir or check.not_none(context.instance).storage_directory(), asset_key_prefix=self.asset_key_prefix, )
@dagster_maintained_io_manager @io_manager(config_schema=ConfigurableLocalOutputNotebookIOManager.to_config_schema()) def local_output_notebook_io_manager(init_context) -> LocalOutputNotebookIOManager: """Built-in IO Manager that handles output notebooks.""" return ConfigurableLocalOutputNotebookIOManager.from_resource_context(init_context)