You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.storage.compute_log_manager

from abc import ABC, abstractmethod
from contextlib import contextmanager
from enum import Enum
from typing import Callable, Iterator, NamedTuple, Optional

from typing_extensions import Self

import dagster._check as check
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun

MAX_BYTES_FILE_READ = 33554432  # 32 MB
MAX_BYTES_CHUNK_READ = 4194304  # 4 MB


class ComputeIOType(Enum):
    STDOUT = "stdout"
    STDERR = "stderr"


class ComputeLogFileData(
    NamedTuple(
        "ComputeLogFileData",
        [
            ("path", str),
            ("data", Optional[str]),
            ("cursor", int),
            ("size", int),
            ("download_url", Optional[str]),
        ],
    )
):
    """Representation of a chunk of compute execution log data."""

    def __new__(
        cls, path: str, data: Optional[str], cursor: int, size: int, download_url: Optional[str]
    ):
        return super(ComputeLogFileData, cls).__new__(
            cls,
            path=check.str_param(path, "path"),
            data=check.opt_str_param(data, "data"),
            cursor=check.int_param(cursor, "cursor"),
            size=check.int_param(size, "size"),
            download_url=check.opt_str_param(download_url, "download_url"),
        )


[docs]class ComputeLogManager(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute steps of pipeline solids. """ @contextmanager def watch(self, dagster_run: DagsterRun, step_key: Optional[str] = None) -> Iterator[None]: """Watch the stdout/stderr for a given execution for a given run_id / step_key and persist it. Args: dagster_run (DagsterRun): The run config step_key (Optional[String]): The step_key for a compute step """ check.inst_param(dagster_run, "dagster_run", DagsterRun) check.opt_str_param(step_key, "step_key") if not self.enabled(dagster_run, step_key): yield return self.on_watch_start(dagster_run, step_key) with self._watch_logs(dagster_run, step_key): yield self.on_watch_finish(dagster_run, step_key) @contextmanager @abstractmethod def _watch_logs( self, dagster_run: DagsterRun, step_key: Optional[str] = None ) -> Iterator[None]: """Method to watch the stdout/stderr logs for a given run_id / step_key. Kept separate from blessed `watch` method, which triggers all the start/finish hooks that are necessary to implement the different remote implementations. Args: dagster_run (DagsterRun): The run config step_key (Optional[String]): The step_key for a compute step """ @abstractmethod def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType) -> str: """Get the local path of the logfile for a given execution step. This determines the location on the local filesystem to which stdout/stderr will be rerouted. Args: run_id (str): The id of the pipeline run. key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) io_type (ComputeIOType): Flag indicating the I/O type, either ComputeIOType.STDOUT or ComputeIOType.STDERR Returns: str """ ... @abstractmethod def is_watch_completed(self, run_id: str, key: str) -> bool: """Flag indicating when computation for a given execution step has completed. Args: run_id (str): The id of the pipeline run. key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) Returns: Boolean """ @abstractmethod def on_watch_start(self, dagster_run: DagsterRun, step_key: Optional[str]) -> None: """Hook called when starting to watch compute logs. Args: pipeline_run (PipelineRun): The pipeline run config step_key (Optional[String]): The step_key for a compute step """ @abstractmethod def on_watch_finish(self, dagster_run: DagsterRun, step_key: Optional[str]) -> None: """Hook called when computation for a given execution step is finished. Args: pipeline_run (PipelineRun): The pipeline run config step_key (Optional[String]): The step_key for a compute step """ @abstractmethod def download_url(self, run_id: str, key: str, io_type: ComputeIOType) -> str: """Get a URL where the logs can be downloaded. Args: run_id (str): The id of the pipeline run. key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr Returns: String """ @abstractmethod def read_logs_file( self, run_id: str, key: str, io_type: ComputeIOType, cursor: int = 0, max_bytes: int = MAX_BYTES_FILE_READ, ) -> ComputeLogFileData: """Get compute log data for a given compute step. Args: run_id (str): The id of the pipeline run. key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr cursor (Optional[Int]): Starting cursor (byte) of log file max_bytes (Optional[Int]): Maximum number of bytes to be read and returned Returns: ComputeLogFileData """ def enabled(self, _dagster_run: DagsterRun, _step_key: Optional[str]) -> bool: """Hook for disabling compute log capture. Args: _step_key (Optional[String]): The step_key for a compute step Returns: Boolean """ return True @abstractmethod def on_subscribe(self, subscription: "ComputeLogSubscription") -> None: """Hook for managing streaming subscriptions for log data from `dagster-webserver`. Args: subscription (ComputeLogSubscription): subscription object which manages when to send back data to the subscriber """ def on_unsubscribe(self, subscription: "ComputeLogSubscription") -> None: pass def observable( self, run_id: str, key: str, io_type: ComputeIOType, cursor: Optional[str] = None ) -> "ComputeLogSubscription": """Return a ComputeLogSubscription which streams back log data from the execution logs for a given compute step. Args: run_id (str): The id of the pipeline run. key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr cursor (Optional[Int]): Starting cursor (byte) of log file Returns: Observable """ check.str_param(run_id, "run_id") check.str_param(key, "key") check.inst_param(io_type, "io_type", ComputeIOType) check.opt_str_param(cursor, "cursor") if cursor: cursor = int(cursor) # type: ignore # (var reassigned diff type) else: cursor = 0 # type: ignore # (var reassigned diff type) subscription = ComputeLogSubscription(self, run_id, key, io_type, cursor) # type: ignore # (var reassigned diff type) self.on_subscribe(subscription) return subscription def dispose(self): pass
class ComputeLogSubscription: """Observable object that generates ComputeLogFileData objects as compute step execution logs are written. """ def __init__( self, manager: ComputeLogManager, run_id: str, key: str, io_type: ComputeIOType, cursor: int, ): self.manager = manager self.run_id = run_id self.key = key self.io_type = io_type self.cursor = cursor self.observer: Optional[Callable[[ComputeLogFileData], None]] = None self.is_complete = False def __call__(self, observer: Callable[[ComputeLogFileData], None]) -> Self: self.observer = observer self.fetch() if self.manager.is_watch_completed(self.run_id, self.key): self.complete() return self def dispose(self) -> None: # called when the connection gets closed, allowing the observer to get GC'ed self.observer = None self.manager.on_unsubscribe(self) def fetch(self) -> None: if not self.observer: return should_fetch = True while should_fetch: update = self.manager.read_logs_file( self.run_id, self.key, self.io_type, self.cursor, max_bytes=MAX_BYTES_CHUNK_READ, ) if not self.cursor or update.cursor != self.cursor: self.observer(update) self.cursor = update.cursor should_fetch = update.data and len(update.data.encode("utf-8")) >= MAX_BYTES_CHUNK_READ def complete(self) -> None: self.is_complete = True if not self.observer: return