I'm looking into integrating a validation framework to an existing PySpark project. There are a lot of examples how to configure Great Expectations using JSON/YAML files in official documentation. However, in my case table schemas are defined as Python classes and I'm aiming to keep the validation definitions in these classes.
When playing around, I noticed this kind of pattern can be used to validate single expectations without any config files:
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([
Row(x=1, y="foo"),
Row(x=2, y=None),
])
ds = SparkDFDataset(df)
expectation: ExpectationValidationResult = ds.expect_column_values_to_not_be_null("y")
print(expectation.success)
where expectation.success
is either False
or True
. However, I'm aiming to build expectation suites and generating reports using programmatic configuration but can't find any references how to do it. This is what I tried to hack but it leads to a runtime exception:
ds.append_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={'column': 'y', 'result_format': 'BASIC'},
))
engine = SparkDFExecutionEngine(
force_reuse_spark_context=True,
)
validator = Validator(
execution_engine=engine,
expectation_suite=ds.get_expectation_suite(),
)
res = validator.validate()
Any pointers on how to configure Great Expectations without config files (or minimal files) are highly appreciated!
Batches are required for validating expectation suites. Here is a working example:
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([
Row(x=1, y="foo"),
Row(x=2, y=None),
])
engine = SparkDFExecutionEngine(
force_reuse_spark_context=True,
)
validator = Validator(
execution_engine=engine,
expectation_suite=ExpectationSuite(
expectation_suite_name="my_suite",
expectations=[
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "y", "result_format": "BASIC"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "x", "result_format": "BASIC"},
)
]
),
batches=[
Batch(
data=df,
batch_definition=BatchDefinition(
datasource_name="foo",
data_connector_name="foo",
data_asset_name="foo",
batch_identifiers=IDDict(ge_batch_id=str(uuid.uuid1())),
),
),
],
)
res = validator.validate()