So I've got some classes that allow me to upload file-like or IO objects to Azure Blob Storage.
My problem here is that I want to pass a callback during the export of those objects, and this callback needs to do multiple things (hence call a higher method).
Here is the code:
from azure.storage.blob import BlobServiceClient
class UploadStatus:
def __init__(self, uploaded=0, total=None):
self.uploaded = uploaded
self.total = total
def update(self, uploaded, total):
if total is not None:
self.total = self._normalize(total)
if uploaded is not None:
self.uploaded = self._normalize(uploaded)
def progress(self):
"""Calculate the progress made for the upload incorring."""
return 100.0 * self.uploaded / self.total if self.total > 0 else 0
def _normalize(self, integer):
return integer if integer else 0
class Resource:
"""A wrapper encapsulating an io-like object"""
def __init__(self, ...):
...
class AzureBlobProxy:
def __init__(self, client):
self.client = client
@classmethod
def build(cls, params: dict):
client = BlobServiceClient(**params)
return cls(client)
def blob_export(self, container: str, name: str, io, block=None) -> dict:
"""Export file-like object to a Storage.
Args:
container (string): Container to use
name (string): Name to use
io (io.IOBase): File-like io object base on IOBase.
"""
blob_client = self.client.get_blob_client(
container=container,
blob=name
)
return blob_client.upload_blob(
data=io,
progress_hook=block
)
class AzureProvider:
def upload(self, resource, status, params={}, block=None):
"""See BaseProvider.upload() description."""
proxy = AzureBlobProxy.build(params)
container, name = self._extract_container_and_name_from_params(params)
def progress_callback(sent, total):
print(block) # doesn't display the block function
status.update(uploaded=sent, total=total)
if block and callable(block):
block.__call__(status)
with resource.with_io() as io:
status.update(uploaded=0, total=sys.getsizeof(io))
proxy.blob_export(container, name, io, progress_callback)
return self._upload_strategy()
def _extract_container_and_name_from_params(self, params):
"""Return container and name for blob"""
...
def __upload_strategy(self):
return 'azure-blob'
def print_progress(status):
print('PROGRESS {}%'.format(int(status.progress())))
parameters = { ... }
resource = Resource(...)
upload_status = UploadStatus()
strategy = provider.upload(resource, upload_status, parameters, print_progress)
Have a look at my inner function progress_callback
that is the progress_hook passed to this final method https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-upload-blob
It should call the function print_progress
, but that's not the case, I don't see any printing "PROGRESS %" appearing.
I can confirm, though, that a file-like object is exported to Azure Blob Storage.
Do you have any idea?
Inner function as callback not called
You can use the below python code includes additional print statements to help debug the issue with the block
function not being called.
Also, I made some modified changes to upload bytes to blob storage with classes.
Here is the detailed explanation of how to upload a file-like object
to Azure Blob Storage using the Azure Blob Storage client library
for Python,
Code:
from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential
import io
import sys
class UploadStatus:
def __init__(self, uploaded=0, total=None):
self.uploaded = uploaded
self.total = total
def update(self, uploaded, total):
if total is not None:
self.total = self._normalize(total)
if uploaded is not None:
self.uploaded = self._normalize(uploaded)
def progress(self):
"""Calculate the progress made for the upload incorring."""
return 100.0 * self.uploaded / self.total if self.total > 0 else 0
def _normalize(self, integer):
return integer if integer else 0
class Resource:
"""A wrapper encapsulating an io-like object"""
def __init__(self, data: bytes):
self.data = data
def with_io(self):
return io.BytesIO(self.data)
class AzureBlobProxy:
def __init__(self, client):
self.client = client
@classmethod
def build(cls, params: dict):
client = BlobServiceClient(**params)
return cls(client)
def blob_export(self, container: str, name: str, io, block=None) -> dict:
"""Export file-like object to a Storage.
Args:
container (string): Container to use
name (string): Name to use
io (io.IOBase): File-like io object base on IOBase.
"""
blob_client = self.client.get_blob_client(
container=container,
blob=name
)
print(f"Starting upload to container: {container}, blob: {name}")
return blob_client.upload_blob(
data=io,
progress_hook=block
)
class AzureProvider:
def upload(self, resource, status, params={}, block=None):
"""See BaseProvider.upload() description."""
proxy = AzureBlobProxy.build(params)
container, name = self._extract_container_and_name_from_params(params)
def progress_callback(sent, total):
print(f"Progress callback triggered. Sent: {sent}, Total: {total}")
status.update(uploaded=sent, total=total)
if block and callable(block):
block.__call__(status)
with resource.with_io() as io:
io_size = sys.getsizeof(io.getvalue())
print(f"IO size: {io_size}")
status.update(uploaded=0, total=io_size)
proxy.blob_export(container, name, io, progress_callback)
return self._upload_strategy()
def _extract_container_and_name_from_params(self, params):
"""Return container and name for blob"""
return params.get('container'), params.get('name')
def _upload_strategy(self):
return 'azure-blob'
def print_progress(status):
print('PROGRESS {}%'.format(int(status.progress())))
# Example usage
parameters = {
'account_url': 'https://<storage account name >.blob.core.windows.net/', # Replace with your account URL
'credential': DefaultAzureCredential(), # Replace with your credential
'container': 'test', # Replace with your container name
'name': 'sample.txt' # Replace with your blob name
}
# Create a sample file-like object
sample_data = b"Sample data for Azure Blob Storage upload. " * 1024 # 43 KB of sample data
resource = Resource(sample_data)
upload_status = UploadStatus()
provider = AzureProvider()
strategy = provider.upload(resource, upload_status, parameters, print_progress)
The above code track upload progress and manage file-like objects. AzureProvider
performs the upload using AzureBlobProxy
, which updates the UploadStatus
and provides real-time progress via a callback. The example explains how to upload sample data and receive progress feedback.
Output:
IO size: 44065
Starting upload to container: test, blob: sample.txt
Progress callback triggered. Sent: 44032, Total: 44032
PROGRESS 100%