Source code for dagster_datadog.resources

from dagster import ConfigurableResource, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from datadog import DogStatsd, initialize, statsd
from pydantic import Field

class DatadogClient:
    # Mirroring levels from the dogstatsd library

    def __init__(self, api_key: str, app_key: str):
        self.api_key = api_key
        self.app_key = app_key
        initialize(api_key=api_key, app_key=app_key)

        # Pull in methods from the dogstatsd library
        for method in [
            setattr(self, method, getattr(statsd, method))

[docs]class DatadogResource(ConfigurableResource): """This resource is a thin wrapper over the `dogstatsd library <>`_. As such, we directly mirror the public API methods of DogStatsd here; you can refer to the `DataDog documentation <>`_ for how to use this resource. Examples: .. code-block:: python @op def datadog_op(datadog_client: ResourceParam[DatadogClient]): datadog_client.event('Man down!', 'This server needs assistance.') datadog_client.gauge('', 1001, tags=["protocol:http"]) datadog_client.increment('page.views') datadog_client.decrement('page.views') datadog_client.histogram('', 26, tags=["gender:female"]) datadog_client.distribution('', 26, tags=["color:blue"]) datadog_client.set('visitors.uniques', 999, tags=["browser:ie"]) datadog_client.service_check('svc.check_name', datadog_client.WARNING) datadog_client.timing("query.response.time", 1234) # Use timed decorator @datadog_client.timed('run_fn') def run_fn(): pass run_fn() @job def job_for_datadog_op() -> None: datadog_op() job_for_datadog_op.execute_in_process( resources={"datadog_client": DatadogResource(api_key="FOO", app_key="BAR")} ) """ api_key: str = Field( description=( "Datadog API key. See" ) ) app_key: str = Field( description=( "Datadog application key. See" "" ) ) @classmethod def _is_dagster_maintained(cls) -> bool: return True def get_client(self) -> DatadogClient: return DatadogClient(self.api_key, self.app_key)
[docs]@dagster_maintained_resource @resource( config_schema=DatadogResource.to_config_schema(), description="This resource is for publishing to DataDog", ) def datadog_resource(context) -> DatadogClient: """This legacy resource is a thin wrapper over the `dogstatsd library <>`_. Prefer using :py:class:`DatadogResource`. As such, we directly mirror the public API methods of DogStatsd here; you can refer to the `DataDog documentation <>`_ for how to use this resource. Examples: .. code-block:: python @op(required_resource_keys={'datadog'}) def datadog_op(context): dd = context.resources.datadog dd.event('Man down!', 'This server needs assistance.') dd.gauge('', 1001, tags=["protocol:http"]) dd.increment('page.views') dd.decrement('page.views') dd.histogram('', 26, tags=["gender:female"]) dd.distribution('', 26, tags=["color:blue"]) dd.set('visitors.uniques', 999, tags=["browser:ie"]) dd.service_check('svc.check_name', dd.WARNING) dd.timing("query.response.time", 1234) # Use timed decorator @dd.timed('run_fn') def run_fn(): pass run_fn() @job(resource_defs={'datadog': datadog_resource}) def dd_job(): datadog_op() result = dd_job.execute_in_process( run_config={'resources': {'datadog': {'config': {'api_key': 'YOUR_KEY', 'app_key': 'YOUR_KEY'}}}} ) """ return DatadogResource.from_resource_context(context).get_client()