pysparkgreat-expectations

Programmatic configuration for Great Expectations


I'm looking into integrating a validation framework to an existing PySpark project. There are a lot of examples how to configure Great Expectations using JSON/YAML files in official documentation. However, in my case table schemas are defined as Python classes and I'm aiming to keep the validation definitions in these classes.

When playing around, I noticed this kind of pattern can be used to validate single expectations without any config files:

spark = SparkSession.builder.master("local[*]").getOrCreate()

df = spark.createDataFrame([
    Row(x=1, y="foo"),
    Row(x=2, y=None),
])
ds = SparkDFDataset(df)

expectation: ExpectationValidationResult = ds.expect_column_values_to_not_be_null("y")
print(expectation.success)

where expectation.success is either False or True. However, I'm aiming to build expectation suites and generating reports using programmatic configuration but can't find any references how to do it. This is what I tried to hack but it leads to a runtime exception:

ds.append_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={'column': 'y', 'result_format': 'BASIC'},
))
engine = SparkDFExecutionEngine(
    force_reuse_spark_context=True,
)
validator = Validator(
    execution_engine=engine,
    expectation_suite=ds.get_expectation_suite(),
)
res = validator.validate()

Any pointers on how to configure Great Expectations without config files (or minimal files) are highly appreciated!


Solution

  • Batches are required for validating expectation suites. Here is a working example:

    spark = SparkSession.builder.master("local[*]").getOrCreate()
    
    df = spark.createDataFrame([
        Row(x=1, y="foo"),
        Row(x=2, y=None),
    ])
    
    engine = SparkDFExecutionEngine(
        force_reuse_spark_context=True,
    )
    validator = Validator(
        execution_engine=engine,
        expectation_suite=ExpectationSuite(
            expectation_suite_name="my_suite",
            expectations=[
                ExpectationConfiguration(
                    expectation_type="expect_column_values_to_not_be_null",
                    kwargs={"column": "y", "result_format": "BASIC"},
                ),
                ExpectationConfiguration(
                    expectation_type="expect_column_values_to_not_be_null",
                    kwargs={"column": "x", "result_format": "BASIC"},
                )
            ]
        ),
        batches=[
            Batch(
                data=df,
                batch_definition=BatchDefinition(
                    datasource_name="foo",
                    data_connector_name="foo",
                    data_asset_name="foo",
                    batch_identifiers=IDDict(ge_batch_id=str(uuid.uuid1())),
                ),
            ),
        ],
    )
    res = validator.validate()