You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_gcp.gcs.resources

from typing import Any, Optional

from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from google.cloud import storage
from pydantic import Field

from .file_manager import GCSFileManager


[docs]class GCSResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """Resource for interacting with Google Cloud Storage. Example: .. code-block:: @asset def my_asset(gcs: GCSResource): with gcs.get_client() as client: # client is a google.cloud.storage.Client ... """ project: Optional[str] = Field(default=None, description="Project name") def get_client(self) -> storage.Client: """Creates a GCS Client. Returns: google.cloud.storage.Client """ return _gcs_client_from_config(project=self.project) def get_object_to_set_on_execution_context(self) -> Any: return self.get_client()
[docs]@dagster_maintained_resource @resource( config_schema=GCSResource.to_config_schema(), description="This resource provides a GCS client", ) def gcs_resource(init_context) -> storage.Client: return GCSResource.from_resource_context(init_context).get_client()
[docs]class GCSFileManagerResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """FileManager that provides abstract access to GCS.""" project: Optional[str] = Field(default=None, description="Project name") gcs_bucket: str = Field(description="GCS bucket to store files") gcs_prefix: str = Field(default="dagster", description="Prefix to add to all file paths") def get_client(self) -> GCSFileManager: """Creates a :py:class:`~dagster_gcp.GCSFileManager` object that implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API . Returns: GCSFileManager """ gcs_client = _gcs_client_from_config(project=self.project) return GCSFileManager( client=gcs_client, gcs_bucket=self.gcs_bucket, gcs_base_key=self.gcs_prefix, ) def get_object_to_set_on_execution_context(self) -> Any: return self.get_client()
[docs]@dagster_maintained_resource @resource(config_schema=GCSFileManagerResource.to_config_schema()) def gcs_file_manager(context): """FileManager that provides abstract access to GCS. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ return GCSFileManagerResource.from_resource_context(context).get_client()
def _gcs_client_from_config(project: Optional[str]) -> storage.Client: """Creates a GCS Client. Args: project: The GCP project Returns: A GCS client. """ return storage.client.Client(project=project)