This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import datetime
from typing import Any, Dict
import great_expectations as ge
from dagster import (
ConfigurableResource,
ExpectationResult,
IAttachDifferentObjectToOpContext,
In,
MetadataValue,
OpExecutionContext,
Out,
Output,
_check as check,
op,
resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster_pandas import DataFrame
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView
from pydantic import Field
try:
# ge < v0.13.0
from great_expectations.core import convert_to_json_serializable
except ImportError:
# ge >= v0.13.0
from great_expectations.core.util import convert_to_json_serializable
class GEContextResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
ge_root_dir: str = Field(
default=None,
description="The root directory for your Great Expectations project.",
)
def get_data_context(self):
if self.ge_root_dir is None:
return ge.data_context.DataContext()
return ge.data_context.DataContext(context_root_dir=self.ge_root_dir)
def get_object_to_set_on_execution_context(self):
return self.get_data_context()
@dagster_maintained_resource
@resource(config_schema=GEContextResource.to_config_schema())
def ge_data_context(context):
return GEContextResource.from_resource_context(context).get_data_context()
[docs]def ge_validation_op_factory(
name,
datasource_name,
suite_name,
validation_operator_name=None,
input_dagster_type=DataFrame,
batch_kwargs=None,
):
"""Generates ops for interacting with GE.
Args:
name (str): the name of the op
datasource_name (str): the name of your DataSource, see your great_expectations.yml
suite_name (str): the name of your expectation suite, see your great_expectations.yml
validation_operator_name (Optional[str]): what validation operator to run -- defaults to
None, which generates an ephemeral validator. If you want to save data docs, use
'action_list_operator'.
See https://legacy.docs.greatexpectations.io/en/0.12.1/reference/core_concepts/validation_operators_and_actions.html#
input_dagster_type (DagsterType): the Dagster type used to type check the input to the op.
Defaults to `dagster_pandas.DataFrame`.
batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the
`ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where
`dataset` is the input to the generated op.
Returns:
An op that takes in a set of data and yields both an expectation with relevant metadata
and an output with all the metadata (for user processing)
"""
check.str_param(datasource_name, "datasource_name")
check.str_param(suite_name, "suite_name")
check.opt_str_param(validation_operator_name, "validation_operator_name")
batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs")
@op(
name=name,
ins={"dataset": In(input_dagster_type)},
out=Out(
dict,
description="""
This op yields an expectationResult with a structured dict of metadata from
the GE suite, as well as the full result in case a user wants to process it differently.
The structured dict contains both summary stats from the suite as well as expectation by
expectation results/details.
""",
),
required_resource_keys={"ge_data_context"},
tags={"kind": "ge"},
)
def _ge_validation_fn(context: OpExecutionContext, dataset):
data_context = context.resources.ge_data_context
if validation_operator_name is not None:
validation_operator = validation_operator_name
else:
data_context.add_validation_operator(
"ephemeral_validation",
{"class_name": "ActionListValidationOperator", "action_list": []},
)
validation_operator = "ephemeral_validation"
suite = data_context.get_expectation_suite(suite_name)
final_batch_kwargs = batch_kwargs or {"dataset": dataset}
if "datasource" in final_batch_kwargs:
context.log.warning(
"`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` "
"parameter of the op factory instead."
)
final_batch_kwargs["datasource"] = datasource_name
batch = data_context.get_batch(final_batch_kwargs, suite)
run_id = {
"run_name": datasource_name + " run",
"run_time": datetime.datetime.utcnow(),
}
results = data_context.run_validation_operator(
validation_operator, assets_to_validate=[batch], run_id=run_id
)
res = convert_to_json_serializable(results.list_validation_results())[0]
validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
rendered_document_content_list = (
validation_results_page_renderer.render_validation_operator_result(results)
)
md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list))
yield ExpectationResult(
success=res["success"],
metadata={"Expectation Results": MetadataValue.md(md_str)},
)
yield Output(res)
return _ge_validation_fn
def ge_validation_op_factory_v3(
name,
datasource_name,
data_connector_name,
data_asset_name,
suite_name,
batch_identifiers: dict,
input_dagster_type=DataFrame,
runtime_method_type="batch_data",
extra_kwargs=None,
):
"""Generates ops for interacting with GE (v3 API).
Args:
name (str): the name of the op
datasource_name (str): the name of your DataSource, see your great_expectations.yml
data_connector_name (str): the name of the data connector for this datasource. This should
point to a RuntimeDataConnector. For information on how to set this up, see:
https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_create_a_batch_of_data_from_an_in_memory_spark_or_pandas_dataframe
data_asset_name (str): the name of the data asset that this op will be validating.
suite_name (str): the name of your expectation suite, see your great_expectations.yml
batch_identifier_fn (dict): A dicitonary of batch identifiers to uniquely identify this
batch of data. To learn more about batch identifiers, see:
https://docs.greatexpectations.io/docs/reference/datasources#batches.
input_dagster_type (DagsterType): the Dagster type used to type check the input to the op.
Defaults to `dagster_pandas.DataFrame`.
runtime_method_type (str): how GE should interperet the op input. One of ("batch_data",
"path", "query"). Defaults to "batch_data", which will interperet the input as an
in-memory object.
extra_kwargs (Optional[dict]): adds extra kwargs to the invocation of `ge_data_context`'s
`get_validator` method. If not set, input will be:
{
"datasource_name": datasource_name,
"data_connector_name": data_connector_name,
"data_asset_name": data_asset_name,
"runtime_parameters": {
"<runtime_method_type>": <op input>
},
"batch_identifiers": batch_identifiers,
"expectation_suite_name": suite_name,
}
Returns:
An op that takes in a set of data and yields both an expectation with relevant metadata and
an output with all the metadata (for user processing)
"""
check.str_param(datasource_name, "datasource_name")
check.str_param(data_connector_name, "data_connector_name")
check.str_param(suite_name, "suite_name")
_extra_kwargs: Dict[Any, Any] = check.opt_dict_param(extra_kwargs, "extra_kwargs")
@op(
name=name,
ins={"dataset": In(input_dagster_type)},
out=Out(
dict,
description="""
This op yields an ExpectationResult with a structured dict of metadata from
the GE suite, as well as the full result in case a user wants to process it differently.
The structured dict contains both summary stats from the suite as well as expectation by
expectation results/details.
""",
),
required_resource_keys={"ge_data_context"},
tags={"kind": "ge"},
)
def _ge_validation_fn(context: OpExecutionContext, dataset):
data_context = context.resources.ge_data_context
validator_kwargs = {
"datasource_name": datasource_name,
"data_connector_name": data_connector_name,
"data_asset_name": datasource_name or data_asset_name,
"runtime_parameters": {runtime_method_type: dataset},
"batch_identifiers": batch_identifiers,
"expectation_suite_name": suite_name,
**_extra_kwargs,
}
validator = data_context.get_validator(**validator_kwargs)
run_id = {
"run_name": datasource_name + " run",
"run_time": datetime.datetime.utcnow(),
}
results = validator.validate(run_id=run_id)
validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
rendered_document_content_list = validation_results_page_renderer.render(
validation_results=results
)
md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list))
yield ExpectationResult(
success=bool(results["success"]),
metadata={"Expectation Results": MetadataValue.md(md_str)},
)
yield Output(results.to_json_dict())
return _ge_validation_fn