You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_datahub.resources

from typing import Any, Dict, List, Optional

from dagster import InitResourceContext, resource
from dagster._config.pythonic_config import Config, ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from datahub.emitter.kafka_emitter import (
    DEFAULT_MCE_KAFKA_TOPIC,
    DEFAULT_MCP_KAFKA_TOPIC,
    MCE_KEY,
    MCP_KEY,
    DatahubKafkaEmitter,
    KafkaEmitterConfig,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from pydantic import Field


[docs]class DatahubRESTEmitterResource(ConfigurableResource): connection: str = Field(description="Datahub GMS Server") token: Optional[str] = Field(default=None, description="Personal Access Token") connect_timeout_sec: Optional[float] = None read_timeout_sec: Optional[float] = None retry_status_codes: Optional[List[int]] = None retry_methods: Optional[List[str]] = None retry_max_times: Optional[int] = None extra_headers: Optional[Dict[str, str]] = None ca_certificate_path: Optional[str] = None server_telemetry_id: Optional[str] = None disable_ssl_verification: bool = False @classmethod def _is_dagster_maintained(cls) -> bool: return True def get_emitter(self) -> DatahubRestEmitter: return DatahubRestEmitter( gms_server=self.connection, token=self.token, connect_timeout_sec=self.connect_timeout_sec, read_timeout_sec=self.read_timeout_sec, retry_status_codes=self.retry_status_codes, retry_methods=self.retry_methods, retry_max_times=self.retry_max_times, extra_headers=self.extra_headers, ca_certificate_path=self.ca_certificate_path, server_telemetry_id=self.server_telemetry_id, disable_ssl_verification=self.disable_ssl_verification, )
[docs]@dagster_maintained_resource @resource(config_schema=DatahubRESTEmitterResource.to_config_schema()) def datahub_rest_emitter(init_context: InitResourceContext) -> DatahubRestEmitter: emitter = DatahubRestEmitter( gms_server=init_context.resource_config.get("connection"), token=init_context.resource_config.get("token"), connect_timeout_sec=init_context.resource_config.get("connect_timeout_sec"), read_timeout_sec=init_context.resource_config.get("read_timeout_sec"), retry_status_codes=init_context.resource_config.get("retry_status_codes"), retry_methods=init_context.resource_config.get("retry_methods"), retry_max_times=init_context.resource_config.get("retry_max_times"), extra_headers=init_context.resource_config.get("extra_headers"), ca_certificate_path=init_context.resource_config.get("ca_certificate_path"), server_telemetry_id=init_context.resource_config.get("server_telemetry_id"), disable_ssl_verification=init_context.resource_config.get("disable_ssl_verification"), ) # Attempt to hit the server to ensure the resource is properly configured emitter.test_connection() return emitter
class DatahubConnection(Config): bootstrap: str = Field(description="Kafka Boostrap Servers. Comma delimited") schema_registry_url: str = Field(description="Schema Registry Location.") schema_registry_config: Dict[str, Any] = Field( default={}, description="Extra Schema Registry Config." )
[docs]class DatahubKafkaEmitterResource(ConfigurableResource): connection: DatahubConnection topic: Optional[str] = None topic_routes: Dict[str, str] = Field( default={ MCE_KEY: DEFAULT_MCE_KAFKA_TOPIC, MCP_KEY: DEFAULT_MCP_KAFKA_TOPIC, } ) @classmethod def _is_dagster_maintained(cls) -> bool: return True def get_emitter(self) -> DatahubKafkaEmitter: return DatahubKafkaEmitter( KafkaEmitterConfig.parse_obj(self._convert_to_config_dictionary()) )
[docs]@dagster_maintained_resource @resource(config_schema=DatahubKafkaEmitterResource.to_config_schema()) def datahub_kafka_emitter(init_context: InitResourceContext) -> DatahubKafkaEmitter: return DatahubKafkaEmitter(KafkaEmitterConfig.parse_obj(init_context.resource_config))