pysparkzipexeazure-synapseazure-data-lake-gen2

Not able to access zip/exe from ADLSv2 into synapse


I have a zip file containing images uploaded to my storage account (ADLSv2)

storage acc: samplesa
container: samplecontainersa
data1: /folder1/sample1.exe
data2: /folder1/sample2.zip

I need to now read this zip, extract all sample images into a Pyspark dataframe in my synapse environment. Following is my code:

import zipfile
from pyspark.sql.functions import map_zip_with
zip_path = "abfss://samplecontainersa@samplesa.dfs.core.windows.net/folder1/sample2.zip"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    file_list = zip_ref.namelist()
image_files = [f for f in file_list if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
image_data = [(f, zip_ref.read(f)) for f in image_files]
df = spark.createDataFrame(image_data, ["filename", "image_bytes"])
df.show()

However, I am getting this following error:

No such file or directory: 'abfss://samplecontainersa@samplesa.dfs.core.windows.net/folder1/sample2.zip'

I can read other csv/txt files in the same directory, however only having issues with accessing exe and zip. Any thoughts? Thanks!


Solution

  • You can follow the below approach to extract all sample images from a .zip file into a Pyspark dataframe:

    Read the Zip file and extract them to temporary file path and verify if images are uploaded to the path using below code:

      # Download the zip file
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
    downloaded_blob = blob_client.download_blob().readall()
    
    # Extract the zip file and upload images to ADLS
    with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
        for file_info in z.infolist():
            if file_info.filename.endswith(('.jpg', '.png')):
                print(f"Extracting and uploading {file_info.filename}")
                with z.open(file_info.filename) as file:
                    # Define the blob client for each file
                    file_name = os.path.basename(file_info.filename)
                    blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
                    blob_client.upload_blob(file, overwrite=True)
    
    # Verify if images are uploaded
    blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
    for blob in blobs_list:
        print(f"Uploaded file: {blob.name}")
    

    Read the extracted images into dataframe using below code:

    df_images = spark \
        .read \
        .format("binaryFile") \
        .option("pathGlobFilter", "*.{jpg,png}") \
        .load("abfss://samplecontainersa@badls.dfs.core.windows.net/folder1/images/")
    display(df_images)
    

    You will get images as dataframe as shown below:

    enter image description here

    Here is the complete code for your reference:

    import zipfile
    from azure.storage.blob import BlobServiceClient
    from io import BytesIO
    from azure.storage.blob import BlobClient
    import os
    
    # Initialize connection to your ADLSv2
    account_name = "<accountName>"
    account_key = "<accountKey>"
    container_name = "<containerName>"
    blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)
    
    # Download the zip file
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
    downloaded_blob = blob_client.download_blob().readall()
    
    # Extract the zip file and upload images to ADLS
    with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
        for file_info in z.infolist():
            if file_info.filename.endswith(('.jpg', '.png')):
                print(f"Extracting and uploading {file_info.filename}")
                with z.open(file_info.filename) as file:
                    # Define the blob client for each file
                    file_name = os.path.basename(file_info.filename)
                    blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
                    blob_client.upload_blob(file, overwrite=True)
    
    # Verify if images are uploaded
    blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
    for blob in blobs_list:
        print(f"Uploaded file: {blob.name}")
    
    # Read the extracted images
    df_images = spark \
        .read \
        .format("binaryFile") \
        .option("pathGlobFilter", "*.{jpg,png}") \
        .load("abfss://samplecontainersa@badls.dfs.core.windows.net/folder1/images/")
    
    display(df_images)