pythonazure-blob-storagedatabricksazure-data-lake-gen2

Unzip file in Azure Blob storage from Databricks


I am trying to unzip a file that is in an Azure ADLS Gen2 container through Azure Databricks Pyspark. When I use ZipFile, I get a BadZipFile error or a FileNotFoundError.

I can read CSVs in the same folder, but not the zip files.

The zip filepath is the same filepath I get from dbutils.fs.ls(blob_folder_url).

BadZipeFile code: BadZipFile

FileNotFound code: FileNotFound

Reading a CSV code: reading csv

Code:

import zipfile, os, io, re

# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"

blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'

# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)

for file in files:
    # Check if the file is a ZIP file
    if file.name.endswith('.zip'):
        print(f"Processing ZIP file: {file.name}")

        # Read the ZIP file into memory
        zip_file_path = file.path
        zip_blob_data = dbutils.fs.head(zip_file_path)  # Read the ZIP file content

        # Unzip the file
        with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
            print('zipppppppper')
        # with zipfile.ZipFile(zip_file, 'r') as z:
        #     print('zipppppppper')  

Error Messages:

  1. BadZipFile: File is not a zip file
  2. FilenotFoundError: [Errno 2] No such file or directory

Solution

  • Here is the code I'm using to unzip, write the csv, and archive the zipped file.

        <!-- language:python-->
        %pip install azure-storage-blob
        
        dbutils.library.restartPython()
        
        from azure.storage.blob import BlobServiceClient
        from io import BytesIO
        import tempfile, os, zipfile, re
    
        def unzip_and_upload_to_blob(
        source_connection_string,
        source_container,
        source_zipped_files,
        dest_container,
        archive_folder_path
    ):
        
        for zipped_file in source_zipped_files:
            # Source blob client setup
            source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
            source_container_client = source_blob_service.get_container_client(source_container)
            source_blob_client = source_container_client.get_blob_client(zipped_file)
            
            # Destination blob client setup (using same connection string)
            dest_container_client = source_blob_service.get_container_client(dest_container)
            
            # Archive Blob Client setup
            archive_file_path = archive_folder_path + get_filename(zipped_file)
            archive_blob_client = source_container_client.get_blob_client(archive_file_path)
    
            
            # Create destination path with .csv extension
            dest_path = os.path.splitext(zipped_file)[0] + '.csv'
            
            # Download and process zip file in memory
            print(f"Downloading zip file from: {zipped_file}")
            blob_data = source_blob_client.download_blob()
            zip_bytes = blob_data.readall()
            zip_buffer = BytesIO(zip_bytes)
            
            # Create a temporary directory for extracted files
            with tempfile.TemporaryDirectory() as temp_dir:
                # Extract files to temporary directory
                print("Extracting zip file...")
                with zipfile.ZipFile(zip_buffer) as zip_ref:
                    zip_ref.extractall(temp_dir)
                    
                # Get list of files in temp directory
                extracted_files = []
                for root, dirs, files in os.walk(temp_dir):
                    for file in files:
                        if file.endswith('.csv'):  # Only process CSV files
                            local_file_path = os.path.join(root, file)
                            extracted_files.append(local_file_path)
                
                if not extracted_files:
                    raise Exception("No CSV files found in the zip archive")
                
                # Upload the CSV file to destination
                if len(extracted_files) == 1:
                    # If there's only one CSV file, upload it with the destination name
                    with open(extracted_files[0], 'rb') as data:
                        print(f"Uploading to: {dest_path}")
                        dest_container_client.upload_blob(
                            name=dest_path,
                            data=data,
                            overwrite=True
                        )
                        print(f"Successfully uploaded to: {dest_path}")
    
                        # Archive the zipped blob
                        # Move the blob
                        try:
                            # Copy the blob to the new location in the archive folder
                            copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)
    
                            # Wait for the copy to complete
                            while copy_status['copy_status'] == "pending":
                                copy_status = archive_blob_client.get_blob_properties().copy
    
                            if copy_status['copy_status'] == "success":
                                # Delete the original blob
                                source_blob_client.delete_blob()
                                print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
                            else:
                                print(f"Failed to copy blob: {zipped_file}")
                        except Exception as e:
                            print(f"An error occurred while moving the blob: {e}")
                else:
                    # If there are multiple CSV files, raise an exception
                    raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")
    

    Here is the code to run that function:

    # unzip the files and upload unzipped files to blob, then delete the zipped files
    unzip_and_upload_to_blob(
            source_connection_string=connection_string,
            source_container=source_container,
            source_zipped_files=zipped_files,
            dest_container=dest_container,
            archive_folder_path=archive_folder_path
        )
    

    Here is the code to get the list of zipped files:

    def list_zip_files(connection_string, source_container, source_blob_prefix):
        # Create the BlobServiceClient object
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    
        # Get the container client
        container_client = blob_service_client.get_container_client(source_container)
    
        zipped_files = []
        # List blobs in the container
        print(f"Listing .zip blobs in container '{source_container}':")
        try:
            blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
            for blob in blob_list:
                if ".zip" in blob.name:
                    zipped_files.append(blob.name)
                    print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
        except Exception as e:
            print(f"An error occurred: {e}")
        return zipped_files
    

    Here is the code to run that function: The connection string is stored in an azure key vault.

    # Azure Blob Storage details
    connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
    storage_account_name = "STORAGE_ACCOUNT_NAME"
    source_container = "SOURCE_CONTAINER_NAME"
    source_blob_prefix = 'BLOB_PREFIX'
    dest_container = "DESTINATION_CONTAINER_NAME"
    folder_path = "SOURCE_FILE_PATH"
    archive_folder_path = "ARCHIVE_FILE_PATH"
    
    # get a list of zipped files in the weekly_sales folder:
    zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)