I am trying to unzip a file that is in an Azure ADLS Gen2 container through Azure Databricks Pyspark. When I use ZipFile, I get a BadZipFile
error or a FileNotFoundError
.
I can read CSVs in the same folder, but not the zip files.
The zip filepath is the same filepath I get from dbutils.fs.ls(blob_folder_url)
.
Code:
import zipfile, os, io, re
# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"
blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'
# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)
for file in files:
# Check if the file is a ZIP file
if file.name.endswith('.zip'):
print(f"Processing ZIP file: {file.name}")
# Read the ZIP file into memory
zip_file_path = file.path
zip_blob_data = dbutils.fs.head(zip_file_path) # Read the ZIP file content
# Unzip the file
with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
print('zipppppppper')
# with zipfile.ZipFile(zip_file, 'r') as z:
# print('zipppppppper')
Error Messages:
Here is the code I'm using to unzip, write the csv, and archive the zipped file.
<!-- language:python-->
%pip install azure-storage-blob
dbutils.library.restartPython()
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import tempfile, os, zipfile, re
def unzip_and_upload_to_blob(
source_connection_string,
source_container,
source_zipped_files,
dest_container,
archive_folder_path
):
for zipped_file in source_zipped_files:
# Source blob client setup
source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
source_container_client = source_blob_service.get_container_client(source_container)
source_blob_client = source_container_client.get_blob_client(zipped_file)
# Destination blob client setup (using same connection string)
dest_container_client = source_blob_service.get_container_client(dest_container)
# Archive Blob Client setup
archive_file_path = archive_folder_path + get_filename(zipped_file)
archive_blob_client = source_container_client.get_blob_client(archive_file_path)
# Create destination path with .csv extension
dest_path = os.path.splitext(zipped_file)[0] + '.csv'
# Download and process zip file in memory
print(f"Downloading zip file from: {zipped_file}")
blob_data = source_blob_client.download_blob()
zip_bytes = blob_data.readall()
zip_buffer = BytesIO(zip_bytes)
# Create a temporary directory for extracted files
with tempfile.TemporaryDirectory() as temp_dir:
# Extract files to temporary directory
print("Extracting zip file...")
with zipfile.ZipFile(zip_buffer) as zip_ref:
zip_ref.extractall(temp_dir)
# Get list of files in temp directory
extracted_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith('.csv'): # Only process CSV files
local_file_path = os.path.join(root, file)
extracted_files.append(local_file_path)
if not extracted_files:
raise Exception("No CSV files found in the zip archive")
# Upload the CSV file to destination
if len(extracted_files) == 1:
# If there's only one CSV file, upload it with the destination name
with open(extracted_files[0], 'rb') as data:
print(f"Uploading to: {dest_path}")
dest_container_client.upload_blob(
name=dest_path,
data=data,
overwrite=True
)
print(f"Successfully uploaded to: {dest_path}")
# Archive the zipped blob
# Move the blob
try:
# Copy the blob to the new location in the archive folder
copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)
# Wait for the copy to complete
while copy_status['copy_status'] == "pending":
copy_status = archive_blob_client.get_blob_properties().copy
if copy_status['copy_status'] == "success":
# Delete the original blob
source_blob_client.delete_blob()
print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
else:
print(f"Failed to copy blob: {zipped_file}")
except Exception as e:
print(f"An error occurred while moving the blob: {e}")
else:
# If there are multiple CSV files, raise an exception
raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")
Here is the code to run that function:
# unzip the files and upload unzipped files to blob, then delete the zipped files
unzip_and_upload_to_blob(
source_connection_string=connection_string,
source_container=source_container,
source_zipped_files=zipped_files,
dest_container=dest_container,
archive_folder_path=archive_folder_path
)
Here is the code to get the list of zipped files:
def list_zip_files(connection_string, source_container, source_blob_prefix):
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Get the container client
container_client = blob_service_client.get_container_client(source_container)
zipped_files = []
# List blobs in the container
print(f"Listing .zip blobs in container '{source_container}':")
try:
blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
for blob in blob_list:
if ".zip" in blob.name:
zipped_files.append(blob.name)
print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
except Exception as e:
print(f"An error occurred: {e}")
return zipped_files
Here is the code to run that function: The connection string is stored in an azure key vault.
# Azure Blob Storage details
connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
storage_account_name = "STORAGE_ACCOUNT_NAME"
source_container = "SOURCE_CONTAINER_NAME"
source_blob_prefix = 'BLOB_PREFIX'
dest_container = "DESTINATION_CONTAINER_NAME"
folder_path = "SOURCE_FILE_PATH"
archive_folder_path = "ARCHIVE_FILE_PATH"
# get a list of zipped files in the weekly_sales folder:
zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)