You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_airbyte.ops

from typing import Any, Iterable, List, Optional

from dagster import Config, In, Nothing, Out, Output, op
from pydantic import Field

from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import _get_attempt, generate_materializations

from .resources import DEFAULT_POLL_INTERVAL_SECONDS, BaseAirbyteResource


class AirbyteSyncConfig(Config):
    connection_id: str = Field(
        ...,
        description=(
            "Parsed json dictionary representing the details of the Airbyte connector after the"
            " sync successfully completes. See the [Airbyte API"
            " Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)"
            " to see detailed information on this response."
        ),
    )
    poll_interval: float = Field(
        DEFAULT_POLL_INTERVAL_SECONDS,
        description=(
            "The maximum time that will waited before this operation is timed out. By "
            "default, this will never time out."
        ),
    )
    poll_timeout: Optional[float] = Field(
        None,
        description=(
            "The maximum time that will waited before this operation is timed out. By "
            "default, this will never time out."
        ),
    )
    yield_materializations: bool = Field(
        True,
        description=(
            "If True, materializations corresponding to the results of the Airbyte sync will "
            "be yielded when the op executes."
        ),
    )
    asset_key_prefix: List[str] = Field(
        ["airbyte"],
        description=(
            "If provided and yield_materializations is True, these components will be used to "
            "prefix the generated asset keys."
        ),
    )


[docs]@op( ins={"start_after": In(Nothing)}, out=Out( AirbyteOutput, description=( "Parsed json dictionary representing the details of the Airbyte connector after the" " sync successfully completes. See the [Airbyte API" " Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)" " to see detailed information on this response." ), ), tags={"kind": "airbyte"}, ) def airbyte_sync_op( context, config: AirbyteSyncConfig, airbyte: BaseAirbyteResource ) -> Iterable[Any]: """Executes a Airbyte job sync for a given ``connection_id``, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a given ``connection_id``. It requires the use of the :py:class:`~dagster_airbyte.airbyte_resource`, which allows it to communicate with the Airbyte API. Examples: .. code-block:: python from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op my_airbyte_resource = airbyte_resource.configured( { "host": {"env": "AIRBYTE_HOST"}, "port": {"env": "AIRBYTE_PORT"}, } ) sync_foobar = airbyte_sync_op.configured({"connection_id": "foobar"}, name="sync_foobar") @job(resource_defs={"airbyte": my_airbyte_resource}) def my_simple_airbyte_job(): sync_foobar() @job(resource_defs={"airbyte": my_airbyte_resource}) def my_composed_airbyte_job(): final_foobar_state = sync_foobar(start_after=some_op()) other_op(final_foobar_state) """ airbyte_output = airbyte.sync_and_poll( connection_id=config.connection_id, poll_interval=config.poll_interval, poll_timeout=config.poll_timeout, ) if config.yield_materializations: yield from generate_materializations( airbyte_output, asset_key_prefix=config.asset_key_prefix ) yield Output( airbyte_output, metadata={ **_get_attempt(airbyte_output.job_details.get("attempts", [{}])[-1]).get( "totalStats", {} ) }, )