pysparkaws-gluegreat-expectations

Great expectations v3 API in aws glue 3.0


I'm trying to a validation in the pipeline using Great expectations on AWS glue 3.0.

Here's my initial attempt to create the data context at runtime based on their docs

def create_context():
    logger.info("Create DataContext Config.")
    data_context_config = DataContextConfig(
        config_version=2,
        plugins_directory=None,
        config_variables_file_path=None,
        # concurrency={"enabled": "true"},
        datasources={
            "my_spark_datasource": DatasourceConfig(
                class_name="Datasource",
                execution_engine={
                    "class_name": "SparkDFExecutionEngine",
                    "module_name": "great_expectations.execution_engine",
                },
                data_connectors={
                    "my_spark_dataconnector": {
                        "module_name": "great_expectations.datasource.data_connector",
                        "class_name": "RuntimeDataConnector",
                        "batch_identifiers": [""],
                    }
                },
            )
        },
        stores={
            "expectations_S3_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "expectations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "validations_S3_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "validations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_S3_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "suppress_store_backend_id": "true",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "checkpoints/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
        },
        expectations_store_name="expectations_S3_store",
        validations_store_name="validations_S3_store",
        evaluation_parameter_store_name="evaluation_parameter_store",
        checkpoint_store_name="checkpoint_S3_store",
        data_docs_sites={
            "s3_site": {
                "class_name": "SiteBuilder",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "data_docs/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                    "show_cta_footer": True,
                },
            }
        },
        anonymous_usage_statistics={"enabled": True},
    )

    # Pass the DataContextConfig as a project_config to BaseDataContext
    context = BaseDataContext(project_config=data_context_config)

    logger.info("Create Checkpoint Config.")
    checkpoint_config = {
        "name": "my_checkpoint",
        "config_version": 1,
        "class_name": "Checkpoint",
        "run_name_template": "ingest_date=%YYYY-%MM-%DD",
        "expectation_suite_name": data_profile_expectation_suite_name,
        "runtime_configuration": {
            "result_format": {
                "result_format": "COMPLETE",
                "include_unexpected_rows": True,
            }
        },
        "evaluation_parameters": {},
    }
    context.add_checkpoint(**checkpoint_config)
    # logger.info(f'GE Data Context Config: "{data_context_config}"')

    return context


Using this i get an error saying attempting to run operations on stopped spark context.

Is there a better way to use the spark source in glue3.0? I want to be able to stay on glue3.0 as much as possible to prevent having to maintain two versions of glue jobs


Solution

  • You can fix this by setting the force_reuse_spark_context to True, here is a quick example (YML):

    config_version: 3.0
    datasources:
      my_spark_datasource:
        class_name: Datasource
        module_name: great_expectations.datasource
        data_connectors:
          my_spark_dataconnector:
            class_name: RuntimeDataConnector
            module_name: great_expectations.datasource.data_connector
            batch_identifiers: {}
        execution_engine:
          class_name: SparkDFExecutionEngine
          force_reuse_spark_context: true
    

    Another thing I would like to add is that you can define the context in a YML file and upload it to S3. Then, you can parse this file in the glue job with the function below:

    def parse_data_context_from_S3(bucket: str, prefix: str = ""):
        object_key = os.path.join(prefix, "great_expectations.yml")
    
        print(f"Parsing s3://{bucket}/{object_key}")
        s3 = boto3.session.Session().client("s3")
        s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]
    
        datacontext_config = yaml.safe_load(s3_object.read())
    
        project_config = DataContextConfig(**datacontext_config)
    
        context = BaseDataContext(project_config=project_config)
        return context
    

    Your CI/CD pipeline can easily replace the store backends in the YML file while deploying it to your environments (dev, hom, prod).

    If you are using the RuntimeDataConnector, you should have no problem using Glue 3.0. The same does not apply if you are using the InferredAssetS3DataConnector and your datasets are encrypted using KMS. In this case, I was only able to use Glue 2.0.