This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import json
from abc import ABC
from enum import Enum
from typing import Any, Dict, List, Mapping, Optional, Union
import dagster._check as check
from dagster._annotations import public
[docs]class AirbyteSyncMode(ABC):
"""Represents the sync mode for a given Airbyte stream, which governs how Airbyte reads
from a source and writes to a destination.
For more information, see https://docs.airbyte.com/understanding-airbyte/connections/.
"""
def __eq__(self, other: Any) -> bool:
return isinstance(other, AirbyteSyncMode) and self.to_json() == other.to_json()
def __init__(self, json_repr: Dict[str, Any]):
self.json_repr = json_repr
def to_json(self) -> Dict[str, Any]:
return self.json_repr
@classmethod
def from_json(cls, json_repr: Dict[str, Any]) -> "AirbyteSyncMode":
return cls(
{
k: v
for k, v in json_repr.items()
if k in ("syncMode", "destinationSyncMode", "cursorField", "primaryKey")
}
)
[docs] @public
@classmethod
def full_refresh_append(cls) -> "AirbyteSyncMode":
"""Syncs the entire data stream from the source, appending rows to the destination.
https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-append/
"""
return cls({"syncMode": "full_refresh", "destinationSyncMode": "append"})
[docs] @public
@classmethod
def full_refresh_overwrite(cls) -> "AirbyteSyncMode":
"""Syncs the entire data stream from the source, replaces data in the destination by
overwriting it.
https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-overwrite
"""
return cls({"syncMode": "full_refresh", "destinationSyncMode": "overwrite"})
[docs] @public
@classmethod
def incremental_append(
cls,
cursor_field: Optional[str] = None,
) -> "AirbyteSyncMode":
"""Syncs only new records from the source, appending rows to the destination.
May optionally specify the cursor field used to determine which records
are new.
https://docs.airbyte.com/understanding-airbyte/connections/incremental-append/
"""
cursor_field = check.opt_str_param(cursor_field, "cursor_field")
return cls(
{
"syncMode": "incremental",
"destinationSyncMode": "append",
**({"cursorField": [cursor_field]} if cursor_field else {}),
}
)
[docs] @public
@classmethod
def incremental_append_dedup(
cls,
cursor_field: Optional[str] = None,
primary_key: Optional[Union[str, List[str]]] = None,
) -> "AirbyteSyncMode":
"""Syncs new records from the source, appending to an append-only history
table in the destination. Also generates a deduplicated view mirroring the
source table. May optionally specify the cursor field used to determine
which records are new, and the primary key used to determine which records
are duplicates.
https://docs.airbyte.com/understanding-airbyte/connections/incremental-append-dedup/
"""
cursor_field = check.opt_str_param(cursor_field, "cursor_field")
if isinstance(primary_key, str):
primary_key = [primary_key]
primary_key = check.opt_list_param(primary_key, "primary_key", of_type=str)
return cls(
{
"syncMode": "incremental",
"destinationSyncMode": "append_dedup",
**({"cursorField": [cursor_field]} if cursor_field else {}),
**({"primaryKey": [[x] for x in primary_key]} if primary_key else {}),
}
)
[docs]class AirbyteSource:
"""Represents a user-defined Airbyte source.
Args:
name (str): The display name of the source.
source_type (str): The type of the source, from Airbyte's list
of sources https://airbytehq.github.io/category/sources/.
source_configuration (Mapping[str, Any]): The configuration for the
source, as defined by Airbyte's API.
"""
[docs] @public
def __init__(self, name: str, source_type: str, source_configuration: Mapping[str, Any]):
self.name = check.str_param(name, "name")
self.source_type = check.str_param(source_type, "source_type")
self.source_configuration = check.mapping_param(
source_configuration, "source_configuration", key_type=str
)
def must_be_recreated(self, other: "AirbyteSource") -> bool:
return self.name != other.name or self.source_type != other.source_type
class InitializedAirbyteSource:
"""User-defined Airbyte source bound to actual created Airbyte source."""
def __init__(self, source: AirbyteSource, source_id: str, source_definition_id: Optional[str]):
self.source = source
self.source_id = source_id
self.source_definition_id = source_definition_id
@classmethod
def from_api_json(cls, api_json: Mapping[str, Any]):
return cls(
source=AirbyteSource(
name=api_json["name"],
source_type=api_json["sourceName"],
source_configuration=api_json["connectionConfiguration"],
),
source_id=api_json["sourceId"],
source_definition_id=None,
)
[docs]class AirbyteDestination:
"""Represents a user-defined Airbyte destination.
Args:
name (str): The display name of the destination.
destination_type (str): The type of the destination, from Airbyte's list
of destinations https://airbytehq.github.io/category/destinations/.
destination_configuration (Mapping[str, Any]): The configuration for the
destination, as defined by Airbyte's API.
"""
[docs] @public
def __init__(
self, name: str, destination_type: str, destination_configuration: Mapping[str, Any]
):
self.name = check.str_param(name, "name")
self.destination_type = check.str_param(destination_type, "destination_type")
self.destination_configuration = check.mapping_param(
destination_configuration, "destination_configuration", key_type=str
)
def must_be_recreated(self, other: "AirbyteDestination") -> bool:
return self.name != other.name or self.destination_type != other.destination_type
class InitializedAirbyteDestination:
"""User-defined Airbyte destination bound to actual created Airbyte destination."""
def __init__(
self,
destination: AirbyteDestination,
destination_id: str,
destination_definition_id: Optional[str],
):
self.destination = destination
self.destination_id = destination_id
self.destination_definition_id = destination_definition_id
@classmethod
def from_api_json(cls, api_json: Mapping[str, Any]):
return cls(
destination=AirbyteDestination(
name=api_json["name"],
destination_type=api_json["destinationName"],
destination_configuration=api_json["connectionConfiguration"],
),
destination_id=api_json["destinationId"],
destination_definition_id=None,
)
class AirbyteDestinationNamespace(Enum):
"""Represents the sync mode for a given Airbyte stream."""
SAME_AS_SOURCE = "source"
DESTINATION_DEFAULT = "destination"
[docs]class AirbyteConnection:
"""A user-defined Airbyte connection, pairing an Airbyte source and destination and configuring
which streams to sync.
Args:
name (str): The display name of the connection.
source (AirbyteSource): The source to sync from.
destination (AirbyteDestination): The destination to sync to.
stream_config (Mapping[str, AirbyteSyncMode]): A mapping from stream name to
the sync mode for that stream, including any additional configuration
of primary key or cursor field.
normalize_data (Optional[bool]): Whether to normalize the data in the
destination.
destination_namespace (Optional[Union[AirbyteDestinationNamespace, str]]):
The namespace to sync to in the destination. If set to
AirbyteDestinationNamespace.SAME_AS_SOURCE, the namespace will be the
same as the source namespace. If set to
AirbyteDestinationNamespace.DESTINATION_DEFAULT, the namespace will be
the default namespace for the destination. If set to a string, the
namespace will be that string.
prefix (Optional[str]): A prefix to add to the table names in the destination.
Example:
.. code-block:: python
from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode
cereals_csv_source = FileSource(...)
local_json_destination = LocalJsonDestination(...)
cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)
"""
[docs] @public
def __init__(
self,
name: str,
source: AirbyteSource,
destination: AirbyteDestination,
stream_config: Mapping[str, AirbyteSyncMode],
normalize_data: Optional[bool] = None,
destination_namespace: Optional[
Union[AirbyteDestinationNamespace, str]
] = AirbyteDestinationNamespace.SAME_AS_SOURCE,
prefix: Optional[str] = None,
):
self.name = check.str_param(name, "name")
self.source = check.inst_param(source, "source", AirbyteSource)
self.destination = check.inst_param(destination, "destination", AirbyteDestination)
self.stream_config = check.mapping_param(
stream_config, "stream_config", key_type=str, value_type=AirbyteSyncMode
)
self.normalize_data = check.opt_bool_param(normalize_data, "normalize_data")
self.destination_namespace = check.opt_inst_param(
destination_namespace, "destination_namespace", (str, AirbyteDestinationNamespace)
)
self.prefix = check.opt_str_param(prefix, "prefix")
def must_be_recreated(self, other: Optional["AirbyteConnection"]) -> bool:
return (
not other
or self.source.must_be_recreated(other.source)
or self.destination.must_be_recreated(other.destination)
)
class InitializedAirbyteConnection:
"""User-defined Airbyte connection bound to actual created Airbyte connection."""
def __init__(
self,
connection: AirbyteConnection,
connection_id: str,
):
self.connection = connection
self.connection_id = connection_id
@classmethod
def from_api_json(
cls,
api_dict: Mapping[str, Any],
init_sources: Mapping[str, InitializedAirbyteSource],
init_dests: Mapping[str, InitializedAirbyteDestination],
):
source = next(
(
source.source
for source in init_sources.values()
if source.source_id == api_dict["sourceId"]
),
None,
)
dest = next(
(
dest.destination
for dest in init_dests.values()
if dest.destination_id == api_dict["destinationId"]
),
None,
)
source = check.not_none(source, f"Could not find source with id {api_dict['sourceId']}")
dest = check.not_none(
dest, f"Could not find destination with id {api_dict['destinationId']}"
)
streams = {
stream["stream"]["name"]: AirbyteSyncMode.from_json(stream["config"])
for stream in api_dict["syncCatalog"]["streams"]
}
return cls(
AirbyteConnection(
name=api_dict["name"],
source=source,
destination=dest,
stream_config=streams,
normalize_data=len(api_dict["operationIds"]) > 0,
destination_namespace=(
api_dict["namespaceFormat"]
if api_dict["namespaceDefinition"] == "customformat"
else AirbyteDestinationNamespace(api_dict["namespaceDefinition"])
),
prefix=api_dict["prefix"] if api_dict.get("prefix") else None,
),
api_dict["connectionId"],
)
def _remove_none_values(obj: Dict[str, Any]) -> Dict[str, Any]:
return {k: v for k, v in obj.items() if v is not None}
def _dump_class(obj: Any) -> Dict[str, Any]:
return json.loads(json.dumps(obj, default=lambda o: _remove_none_values(o.__dict__)))
class GeneratedAirbyteSource(AirbyteSource):
"""Base class used by the codegen Airbyte sources. This class is not intended to be used directly.
Converts all of its attributes into a source configuration dict which is passed down to the base
AirbyteSource class.
"""
def __init__(self, source_type: str, name: str):
source_configuration = _dump_class(self)
super().__init__(
name=name, source_type=source_type, source_configuration=source_configuration
)
class GeneratedAirbyteDestination(AirbyteDestination):
"""Base class used by the codegen Airbyte destinations. This class is not intended to be used directly.
Converts all of its attributes into a destination configuration dict which is passed down to the
base AirbyteDestination class.
"""
def __init__(self, source_type: str, name: str):
destination_configuration = _dump_class(self)
super().__init__(
name=name,
destination_type=source_type,
destination_configuration=destination_configuration,
)