This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from typing import AbstractSet, Iterable, NamedTuple, Optional, Sequence
from typing_extensions import Self
import dagster._check as check
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.repository_definition import SINGLETON_REPOSITORY_NAME
from dagster._serdes import create_snapshot_id, whitelist_for_serdes
class JobSubsetSelector(
NamedTuple(
"_JobSubsetSelector",
[
("location_name", str),
("repository_name", str),
("job_name", str),
("op_selection", Optional[Sequence[str]]),
("asset_selection", Optional[AbstractSet[AssetKey]]),
],
)
):
"""The information needed to resolve a job within a host process."""
def __new__(
cls,
location_name: str,
repository_name: str,
job_name: str,
op_selection: Optional[Sequence[str]],
asset_selection: Optional[Iterable[AssetKey]] = None,
):
asset_selection = set(asset_selection) if asset_selection else None
return super(JobSubsetSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
job_name=check.str_param(job_name, "job_name"),
op_selection=check.opt_nullable_sequence_param(op_selection, "op_selection", str),
asset_selection=check.opt_nullable_set_param(
asset_selection, "asset_selection", AssetKey
),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"pipelineName": self.job_name,
"solidSelection": self.op_selection,
}
def with_op_selection(self, op_selection: Optional[Sequence[str]]) -> Self:
check.invariant(
self.op_selection is None,
f"Can not invoke with_op_selection when op_selection={self.op_selection} is"
" already set",
)
return JobSubsetSelector(
self.location_name, self.repository_name, self.job_name, op_selection
)
[docs]@whitelist_for_serdes
class JobSelector(
NamedTuple(
"_JobSelector", [("location_name", str), ("repository_name", str), ("job_name", str)]
)
):
def __new__(
cls,
location_name: str,
repository_name: Optional[str] = None,
job_name: Optional[str] = None,
):
return super(JobSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.opt_str_param(
repository_name,
"repository_name",
default=SINGLETON_REPOSITORY_NAME,
),
job_name=check.str_param(
job_name,
"job_name",
"Must provide job_name argument even though it is marked as optional in the "
"function signature. repository_name, a truly optional parameter, is before "
"that argument and actually optional. Use of keyword arguments is "
"recommended to avoid confusion.",
),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"jobName": self.job_name,
}
@property
def selector_id(self):
return create_snapshot_id(self)
@staticmethod
def from_graphql_input(graphql_data):
return JobSelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
job_name=graphql_data["jobName"],
)
[docs]@whitelist_for_serdes
class RepositorySelector(
NamedTuple("_RepositorySelector", [("location_name", str), ("repository_name", str)])
):
def __new__(cls, location_name: str, repository_name: str):
return super(RepositorySelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
}
@property
def selector_id(self):
return create_snapshot_id(self)
@staticmethod
def from_graphql_input(graphql_data):
return RepositorySelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
)
class CodeLocationSelector(NamedTuple("_CodeLocationSelector", [("location_name", str)])):
def __new__(cls, location_name: str):
return super(CodeLocationSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
)
def to_repository_selector(self) -> RepositorySelector:
return RepositorySelector(
location_name=self.location_name, repository_name=SINGLETON_REPOSITORY_NAME
)
class ScheduleSelector(
NamedTuple(
"_ScheduleSelector",
[("location_name", str), ("repository_name", str), ("schedule_name", str)],
)
):
def __new__(cls, location_name: str, repository_name: str, schedule_name: str):
return super(ScheduleSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
schedule_name=check.str_param(schedule_name, "schedule_name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"scheduleName": self.schedule_name,
}
@staticmethod
def from_graphql_input(graphql_data):
return ScheduleSelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
schedule_name=graphql_data["scheduleName"],
)
class ResourceSelector(NamedTuple):
location_name: str
repository_name: str
resource_name: str
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"resourceName": self.resource_name,
}
@staticmethod
def from_graphql_input(graphql_data):
return ResourceSelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
resource_name=graphql_data["resourceName"],
)
class SensorSelector(
NamedTuple(
"_SensorSelector", [("location_name", str), ("repository_name", str), ("sensor_name", str)]
)
):
def __new__(cls, location_name: str, repository_name: str, sensor_name: str):
return super(SensorSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
sensor_name=check.str_param(sensor_name, "sensor_name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"sensorName": self.sensor_name,
}
@staticmethod
def from_graphql_input(graphql_data):
return SensorSelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
sensor_name=graphql_data["sensorName"],
)
@whitelist_for_serdes
class InstigatorSelector(
NamedTuple(
"_InstigatorSelector", [("location_name", str), ("repository_name", str), ("name", str)]
)
):
def __new__(cls, location_name: str, repository_name: str, name: str):
return super(InstigatorSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
name=check.str_param(name, "name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"name": self.name,
}
@staticmethod
def from_graphql_input(graphql_data):
return InstigatorSelector(
location_name=graphql_data["repositoryLocationName"],
repository_name=graphql_data["repositoryName"],
name=graphql_data["name"],
)
class GraphSelector(
NamedTuple(
"_GraphSelector", [("location_name", str), ("repository_name", str), ("graph_name", str)]
)
):
"""The information needed to resolve a graph within a host process."""
def __new__(cls, location_name: str, repository_name: str, graph_name: str):
return super(GraphSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
graph_name=check.str_param(graph_name, "graph_name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"graphName": self.graph_name,
}
@whitelist_for_serdes
class PartitionSetSelector(
NamedTuple(
"_PartitionSetSelector",
[("location_name", str), ("repository_name", str), ("partition_set_name", str)],
)
):
"""The information needed to resolve a partition set within a host process."""
def __new__(cls, location_name: str, repository_name: str, partition_set_name: str):
return super(PartitionSetSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
repository_name=check.str_param(repository_name, "repository_name"),
partition_set_name=check.str_param(partition_set_name, "partition_set_name"),
)
def to_graphql_input(self):
return {
"repositoryLocationName": self.location_name,
"repositoryName": self.repository_name,
"partitionSetName": self.partition_set_name,
}
class PartitionRangeSelector(
NamedTuple(
"_PartitionRangeSelector",
[("start", str), ("end", str)],
)
):
"""The information needed to resolve a partition range."""
def __new__(cls, start: str, end: str):
return super(PartitionRangeSelector, cls).__new__(
cls,
start=check.inst_param(start, "start", str),
end=check.inst_param(end, "end", str),
)
def to_graphql_input(self):
return {
"start": self.start,
"end": self.end,
}
@staticmethod
def from_graphql_input(graphql_data):
return PartitionRangeSelector(
start=graphql_data["start"],
end=graphql_data["end"],
)
class PartitionsSelector(
NamedTuple(
"_PartitionsSelector",
[("partition_range", PartitionRangeSelector)],
)
):
"""The information needed to define selection partitions.
Using partition_range as property name to avoid shadowing Python 'range' builtin .
"""
def __new__(cls, partition_range: PartitionRangeSelector):
return super(PartitionsSelector, cls).__new__(
cls,
partition_range=check.inst_param(partition_range, "range", PartitionRangeSelector),
)
def to_graphql_input(self):
return {
"range": self.partition_range.to_graphql_input(),
}
@staticmethod
def from_graphql_input(graphql_data):
return PartitionsSelector(
partition_range=PartitionRangeSelector.from_graphql_input(graphql_data["range"])
)
class PartitionsByAssetSelector(
NamedTuple(
"PartitionsByAssetSelector",
[
("asset_key", AssetKey),
("partitions", Optional[PartitionsSelector]),
],
)
):
"""The information needed to define partitions selection for a given asset key."""
def __new__(cls, asset_key: AssetKey, partitions: Optional[PartitionsSelector] = None):
return super(PartitionsByAssetSelector, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
partitions=check.opt_inst_param(partitions, "partitions", PartitionsSelector),
)
def to_graphql_input(self):
return {
"assetKey": self.asset_key.to_graphql_input(),
"partitions": self.partitions.to_graphql_input() if self.partitions else None,
}
@staticmethod
def from_graphql_input(graphql_data):
asset_key = graphql_data["assetKey"]
partitions = graphql_data.get("partitions")
return PartitionsByAssetSelector(
asset_key=AssetKey.from_graphql_input(asset_key),
partitions=PartitionsSelector.from_graphql_input(partitions) if partitions else None,
)