google-bigqueryairflowgoogle-cloud-storageairflow-api

What is the structure of the executable transformation script for transform_script of GCSFileTransformOperator?


Currently working on a task in Airflow that requires pre-processing a large csv file using GCSFileTransformOperator. Reading the documentation on the class and its implementation, but don't quite understand how the executable transformation script for transform_script should be structured.

For example, is the following script structure correct? If so, does that mean with GCSFileTransformOperator, Airflow is calling the executable transformation script and passing arguments from command line?

# Import the required modules
import preprocessing modules
import sys

# Define the function that passes source_file and destination_file params
def preprocess_file(source_file, destination_file):
     # (1) code that processes the source_file
     # (2) code then writes to destination_file

# Extract source_file and destination_file from the list of command-line arguments
source_file = sys.argv[1]
destination_file = sys.argv[2]

preprocess_file(source_file, destination_file)


Solution

  • GCSFileTransformOperator passes the script to subprocess.Popen, so your script will work but you will need to add a shebang #!/usr/bin/python (of wherever Python is on your path in Airflow).

    Your arguments are correct and the format of your script can be anything you want. Airflow passes in the path of the downloaded file, and a temporary new file:

    cmd = (
        [self.transform_script]
        if isinstance(self.transform_script, str)
        else self.transform_script
    )
    cmd += [source_file.name, destination_file.name]
    with subprocess.Popen(
        args=cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True
    ) as process:
        # ...
        process.wait()
    

    (you can see the source here)