You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_ge.factory

import datetime
from typing import Any, Dict

import great_expectations as ge
from dagster import (
    ConfigurableResource,
    ExpectationResult,
    IAttachDifferentObjectToOpContext,
    In,
    MetadataValue,
    OpExecutionContext,
    Out,
    Output,
    _check as check,
    op,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster_pandas import DataFrame
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView
from pydantic import Field

try:
    # ge < v0.13.0
    from great_expectations.core import convert_to_json_serializable
except ImportError:
    # ge >= v0.13.0
    from great_expectations.core.util import convert_to_json_serializable


class GEContextResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
    ge_root_dir: str = Field(
        default=None,
        description="The root directory for your Great Expectations project.",
    )

    def get_data_context(self):
        if self.ge_root_dir is None:
            return ge.data_context.DataContext()
        return ge.data_context.DataContext(context_root_dir=self.ge_root_dir)

    def get_object_to_set_on_execution_context(self):
        return self.get_data_context()


@dagster_maintained_resource
@resource(config_schema=GEContextResource.to_config_schema())
def ge_data_context(context):
    return GEContextResource.from_resource_context(context).get_data_context()


[docs]def ge_validation_op_factory( name, datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame, batch_kwargs=None, ): """Generates ops for interacting with GE. Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml suite_name (str): the name of your expectation suite, see your great_expectations.yml validation_operator_name (Optional[str]): what validation operator to run -- defaults to None, which generates an ephemeral validator. If you want to save data docs, use 'action_list_operator'. See https://legacy.docs.greatexpectations.io/en/0.12.1/reference/core_concepts/validation_operators_and_actions.html# input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where `dataset` is the input to the generated op. Returns: An op that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ check.str_param(datasource_name, "datasource_name") check.str_param(suite_name, "suite_name") check.opt_str_param(validation_operator_name, "validation_operator_name") batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs") @op( name=name, ins={"dataset": In(input_dagster_type)}, out=Out( dict, description=""" This op yields an expectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ), required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def _ge_validation_fn(context: OpExecutionContext, dataset): data_context = context.resources.ge_data_context if validation_operator_name is not None: validation_operator = validation_operator_name else: data_context.add_validation_operator( "ephemeral_validation", {"class_name": "ActionListValidationOperator", "action_list": []}, ) validation_operator = "ephemeral_validation" suite = data_context.get_expectation_suite(suite_name) final_batch_kwargs = batch_kwargs or {"dataset": dataset} if "datasource" in final_batch_kwargs: context.log.warning( "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` " "parameter of the op factory instead." ) final_batch_kwargs["datasource"] = datasource_name batch = data_context.get_batch(final_batch_kwargs, suite) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.utcnow(), } results = data_context.run_validation_operator( validation_operator, assets_to_validate=[batch], run_id=run_id ) res = convert_to_json_serializable(results.list_validation_results())[0] validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = ( validation_results_page_renderer.render_validation_operator_result(results) ) md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list)) yield ExpectationResult( success=res["success"], metadata={"Expectation Results": MetadataValue.md(md_str)}, ) yield Output(res) return _ge_validation_fn
def ge_validation_op_factory_v3( name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers: dict, input_dagster_type=DataFrame, runtime_method_type="batch_data", extra_kwargs=None, ): """Generates ops for interacting with GE (v3 API). Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml data_connector_name (str): the name of the data connector for this datasource. This should point to a RuntimeDataConnector. For information on how to set this up, see: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_create_a_batch_of_data_from_an_in_memory_spark_or_pandas_dataframe data_asset_name (str): the name of the data asset that this op will be validating. suite_name (str): the name of your expectation suite, see your great_expectations.yml batch_identifier_fn (dict): A dicitonary of batch identifiers to uniquely identify this batch of data. To learn more about batch identifiers, see: https://docs.greatexpectations.io/docs/reference/datasources#batches. input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. runtime_method_type (str): how GE should interperet the op input. One of ("batch_data", "path", "query"). Defaults to "batch_data", which will interperet the input as an in-memory object. extra_kwargs (Optional[dict]): adds extra kwargs to the invocation of `ge_data_context`'s `get_validator` method. If not set, input will be: { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": data_asset_name, "runtime_parameters": { "<runtime_method_type>": <op input> }, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, } Returns: An op that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ check.str_param(datasource_name, "datasource_name") check.str_param(data_connector_name, "data_connector_name") check.str_param(suite_name, "suite_name") _extra_kwargs: Dict[Any, Any] = check.opt_dict_param(extra_kwargs, "extra_kwargs") @op( name=name, ins={"dataset": In(input_dagster_type)}, out=Out( dict, description=""" This op yields an ExpectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ), required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def _ge_validation_fn(context: OpExecutionContext, dataset): data_context = context.resources.ge_data_context validator_kwargs = { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": datasource_name or data_asset_name, "runtime_parameters": {runtime_method_type: dataset}, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, **_extra_kwargs, } validator = data_context.get_validator(**validator_kwargs) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.utcnow(), } results = validator.validate(run_id=run_id) validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = validation_results_page_renderer.render( validation_results=results ) md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list)) yield ExpectationResult( success=bool(results["success"]), metadata={"Expectation Results": MetadataValue.md(md_str)}, ) yield Output(results.to_json_dict()) return _ge_validation_fn