This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import Any, Iterable, List, Optional
from dagster import Config, In, Nothing, Out, Output, op
from pydantic import Field
from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import _get_attempt, generate_materializations
from .resources import DEFAULT_POLL_INTERVAL_SECONDS, BaseAirbyteResource
class AirbyteSyncConfig(Config):
connection_id: str = Field(
...,
description=(
"Parsed json dictionary representing the details of the Airbyte connector after the"
" sync successfully completes. See the [Airbyte API"
" Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)"
" to see detailed information on this response."
),
)
poll_interval: float = Field(
DEFAULT_POLL_INTERVAL_SECONDS,
description=(
"The maximum time that will waited before this operation is timed out. By "
"default, this will never time out."
),
)
poll_timeout: Optional[float] = Field(
None,
description=(
"The maximum time that will waited before this operation is timed out. By "
"default, this will never time out."
),
)
yield_materializations: bool = Field(
True,
description=(
"If True, materializations corresponding to the results of the Airbyte sync will "
"be yielded when the op executes."
),
)
asset_key_prefix: List[str] = Field(
["airbyte"],
description=(
"If provided and yield_materializations is True, these components will be used to "
"prefix the generated asset keys."
),
)
[docs]@op(
ins={"start_after": In(Nothing)},
out=Out(
AirbyteOutput,
description=(
"Parsed json dictionary representing the details of the Airbyte connector after the"
" sync successfully completes. See the [Airbyte API"
" Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)"
" to see detailed information on this response."
),
),
tags={"kind": "airbyte"},
)
def airbyte_sync_op(
context, config: AirbyteSyncConfig, airbyte: BaseAirbyteResource
) -> Iterable[Any]:
"""Executes a Airbyte job sync for a given ``connection_id``, and polls until that sync
completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains
the job details for a given ``connection_id``.
It requires the use of the :py:class:`~dagster_airbyte.airbyte_resource`, which allows it to
communicate with the Airbyte API.
Examples:
.. code-block:: python
from dagster import job
from dagster_airbyte import airbyte_resource, airbyte_sync_op
my_airbyte_resource = airbyte_resource.configured(
{
"host": {"env": "AIRBYTE_HOST"},
"port": {"env": "AIRBYTE_PORT"},
}
)
sync_foobar = airbyte_sync_op.configured({"connection_id": "foobar"}, name="sync_foobar")
@job(resource_defs={"airbyte": my_airbyte_resource})
def my_simple_airbyte_job():
sync_foobar()
@job(resource_defs={"airbyte": my_airbyte_resource})
def my_composed_airbyte_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)
"""
airbyte_output = airbyte.sync_and_poll(
connection_id=config.connection_id,
poll_interval=config.poll_interval,
poll_timeout=config.poll_timeout,
)
if config.yield_materializations:
yield from generate_materializations(
airbyte_output, asset_key_prefix=config.asset_key_prefix
)
yield Output(
airbyte_output,
metadata={
**_get_attempt(airbyte_output.job_details.get("attempts", [{}])[-1]).get(
"totalStats", {}
)
},
)