I'm using AWS Glue Visual Studio ETL to process CSV files stored in an S3 bucket. The files are registered in the Glue Data Catalog and read as a single DynamicFrame during the ETL job. I want to:
- Retrieve the file names for the source files being processed.
- Perform data validation (e.g., schema checks or custom rules) on the data.
- Localize errors to the specific file that caused the validation failure.
- Move the file with errors to a failed folder in S3 for further inspection, while continuing to process valid files.
However, since the input data is merged into a single DynamicFrame, I cannot directly associate validation errors with specific files. Questions:
- How can I extract the file names for each row or batch of data in the DynamicFrame?
- Is there a way to run validation on a per-file basis in Glue Visual ETL?
- How can I isolate a file with errors and move it to a failed folder programmatically in Glue?
Any guidance or examples on how to achieve this would be greatly appreciated!
def add_file_name (glueContext, dfc) -> DynamicFrameCollection:
from awsglue.dynamicframe import DynamicFrame, DynamicFrameCollection
from pyspark.sql.functions import input_file_name, element_at, split
transformed_frames = {}
# Iterate over the key-value pairs in the DynamicFrameCollection
for key in dfc.keys():
dynamic_frame = dfc.select(key)
spark_df = dynamic_frame.toDF()
spark_df = spark_df.withColumn("file_name", input_file_name())
transformed_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, f"{key}_transformed")
transformed_frames[key] = transformed_dynamic_frame
return DynamicFrameCollection(transformed_frames, glueContext)