I'm trying to a validation in the pipeline using Great expectations on AWS glue 3.0.
Here's my initial attempt to create the data context at runtime based on their docs
def create_context():
logger.info("Create DataContext Config.")
data_context_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
# concurrency={"enabled": "true"},
datasources={
"my_spark_datasource": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
data_connectors={
"my_spark_dataconnector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [""],
}
},
)
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "expectations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "validations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_S3_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"suppress_store_backend_id": "true",
"bucket": data_profile_s3_store_bucket,
"prefix": "checkpoints/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_S3_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "data_docs/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
anonymous_usage_statistics={"enabled": True},
)
# Pass the DataContextConfig as a project_config to BaseDataContext
context = BaseDataContext(project_config=data_context_config)
logger.info("Create Checkpoint Config.")
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"run_name_template": "ingest_date=%YYYY-%MM-%DD",
"expectation_suite_name": data_profile_expectation_suite_name,
"runtime_configuration": {
"result_format": {
"result_format": "COMPLETE",
"include_unexpected_rows": True,
}
},
"evaluation_parameters": {},
}
context.add_checkpoint(**checkpoint_config)
# logger.info(f'GE Data Context Config: "{data_context_config}"')
return context
Using this i get an error saying attempting to run operations on stopped spark context.
Is there a better way to use the spark source in glue3.0? I want to be able to stay on glue3.0 as much as possible to prevent having to maintain two versions of glue jobs
You can fix this by setting the force_reuse_spark_context
to True
, here is a quick example (YML):
config_version: 3.0
datasources:
my_spark_datasource:
class_name: Datasource
module_name: great_expectations.datasource
data_connectors:
my_spark_dataconnector:
class_name: RuntimeDataConnector
module_name: great_expectations.datasource.data_connector
batch_identifiers: {}
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
Another thing I would like to add is that you can define the context in a YML file and upload it to S3. Then, you can parse this file in the glue job with the function below:
def parse_data_context_from_S3(bucket: str, prefix: str = ""):
object_key = os.path.join(prefix, "great_expectations.yml")
print(f"Parsing s3://{bucket}/{object_key}")
s3 = boto3.session.Session().client("s3")
s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]
datacontext_config = yaml.safe_load(s3_object.read())
project_config = DataContextConfig(**datacontext_config)
context = BaseDataContext(project_config=project_config)
return context
Your CI/CD pipeline can easily replace the store backends in the YML file while deploying it to your environments (dev
, hom
, prod
).
If you are using the RuntimeDataConnector
, you should have no problem using Glue 3.0. The same does not apply if you are using the InferredAssetS3DataConnector and your datasets are encrypted using KMS. In this case, I was only able to use Glue 2.0.