I'd like to be able to treat an azure blob like an IO object using the python SDK. As far as I can tell, this requires me to either:
a) use .readinto()
to read the blob into an IO object (thus loading the whole undefined-size blob into memory inside a container with 256Mb of memory)
b) manually call .read()
with an offset and a limit (thus requiring me to know exactly how much I want to read at a given time)
I'm trying to read a gzipped mongodump BSON file using bson.decode_file_iter
, so I don't know exactly how many bytes I want to read in a given stretch (at least, not without having to decode the BSON myself a little), and obviously solution a) isn't great if I have particularly gigantic dump files (for example, I'm working with one that's half a gig uncompressed right now). To the extent of my knowledge, there's nothing in the azure blob sdk that exposes this. In an ideal universe, I'd be able to stream the blob through gzip decompression and into that bson decode function. Is there something I can use to achieve this without having to write a complete translation layer, or should I stick with my current approach of downloading the blob to a file, then using gzip.open
to read the downloaded file and pipe it into bson.decode_file_iter
?
Is there any way to read and process an azure blob as a stream using the python sdk without loading the whole blob into memory?
You can use the below code to read and process an azure blob as a stream using the python sdk without loading the whole blob into memory.
Code:
import io
import gzip
import bson
from azure.storage.blob import BlobServiceClient
class BlobStream(io.RawIOBase):
def __init__(self, blob_client):
self.blob_client = blob_client
self.stream = self.blob_client.download_blob()
self.stream_iter = self.stream.chunks()
self.buffer = b""
def readinto(self, b):
try:
chunk = next(self.stream_iter)
self.buffer += chunk
except StopIteration:
pass
size = len(self.buffer)
length = min(len(b), size)
b[:length] = self.buffer[:length]
self.buffer = self.buffer[length:]
return length
def readable(self):
return True
blob_service_client = BlobServiceClient.from_connection_string("DefaultEndpointsProtocol=https;AccountName=venkat123;AccountKey=/Z3A9SDaP7a8DKxolqooke13Z4Uyxxxxx==;EndpointSuffix=core.windows.net")
blob_client = blob_service_client.get_blob_client(container="test", blob="large_sample.gz")
blob_stream = BlobStream(blob_client)
with gzip.GzipFile(fileobj=blob_stream) as gzipped_blob:
for doc in bson.decode_file_iter(gzipped_blob):
print(doc)
In this code, custom BlobStream
class that extends io.RawIOBase
.This class uses the download_blob
method of the BlobClient
object to download a blob to a stream.
It reads the stream in chunks using the chunks
method of the StorageStreamDownloader
object.The readinto
method of the BlobStream
class reads data from the stream into a buffer and returns the number of bytes read.
By processing the stream in chunks and buffering the data, the code efficiently handles large blobs without loading the entire blob into memory at once.
Output:
{vGAxXP6r4r': '42tt1xFZ4fsHQizfxsEbhIepntrWBcd2KSU7P58RFp0OiQiMZ5kDKIdoKscVazIxXBjtOJAQNv3oLGCGrvLPgiOrDSbjGJ4wXH6h', 'USvIfwuHQ6': 'pOk9xAXWdKuPVcWImLYKRsmDq1fIHCgRfHmUnv5t5tTT3fg6RE0bKmCf6V8UQ9JBYdlxRxMAAxCsdfBeIdNO8PtQOswxewlqnFGZ', 'YNgIcaA3N9': '3A4GLIaLmpjSaRZGecZqUh76xjRGTnBQ4sXB3Bcl7CSh8ycToVWmX81mRlKvVqSnYnDy92fpv7kOD5hxdIG6JCWps5cLm99e3Gqr', 'DLsJ1QOXvJ': 'f6NK561gNY0ZYxJcKO2KoKxbyRjzIgiKG9AjNWes3ti3kSTyuwjKNuhb48MiHMTuGFblLz4ufFpAj76tD550ajTqBbyeODXkyZhc', 'xnip5jrUGV': 'K4OEwJVhkd1gzBwqeq5LDkNNZU2Z2MMr7FSy42SOSn2SBI8xlpvcrbqTZs5JZWBi9abP4ziaE3lqf6FBvyYJ2iQeiYpuWybPRnAE', 'KZbTjdxBcw': 'wYX2ioBqxTbCTzHqmJEOv4fNsDq8wFKAWD8x3I4mX7fmpyI0K9ZODUi0nn7020tgXD021TXuSlhjAyMLaJVy9uhmZ7cu3l4JmyEU', 'POZkbGX1Oo': 'Rstj0ldaOFLy4doyPMXbLR9MH05TRLW820yWWsHAlAzb3uporaLxyON290w6tPLRUiOVJqfr8l1q3HpSc48yxuheZ0cz3pgwRqQl', 'b6zzcCZqvT': 'fISfGw3ftsB6BkmJYosIgKeMsSTHYbWIDrYMfH14LA7hYg4oj852UQ2I76uHC93nltZLIawDKv5GFXZLlcYLQwPmDsuFGC22cpaN', 'fBb6tmprgQ': '8JmO0twpLmNtjNQn7Zn5NeByLzEpSldfPUtCWOQytfcj2aFizu7ma8bWGVRY5Xbg8v5eX8lWJC6k7ddOQRZNoAdSWiCOQBfD8m7J', 'YWd247222J': 'Y05nSM1lcnUrnsmEKndc6hoX6pItvoJFkkE5qYXuSCmgpLnwcfKgY8lUIgXLeG04BUIVqVytNGgpaNHKOyXb7hZcl7uJpgS2tmeG', 'UuA86rFKln': 'nNWuL6SRUoUrkoMCR8dKb5vzFxGi4JVwRbGZHcC6lrCUk5ZRZdkL0VoG1RScRCwE3l1KSWCUaKeWbGbvPpSwFIBJSPvvk1bJZUFy', 'sqH5XlZ6Dv': 'GHtgkrDtxJcQwRY5WSHo1oJwPpjhh8NCkh4E7Wb1qCXUE2SWdaZLpnPxFUx9mJMQgAsf3Rrs2djlvkTL0B3J9xudpCRhv8v9gdM4', 'TSy8HqfeuJ': '9UDVGZQLOhnd7Cgknstfzeth8SM1gGecYDfufj5MWRaWoVGetJ0eVt2unUEgjjVMBumUZgW2jPJFC1h4tPbYlHWs2kvBDRTsHJBZ', 'AeSdgHc3ML': 'ImJquzYHwfV394qlp09i2udfJ0o8NjsXZusyLALjYD3jKMrGvpRmLGpQJQlpKcd5vSXMOKXQFET665ix0wB0vARFodSgddRqDHmS', 'MyWBr0uG5W': 'lv00rWQ42e4yrWen8V9VL9KgQPYCC9EGh5kifN52rjzgdSO1iRtif7b32qZ5weSaQz6oHfHQflUhLPSMr6ZpwpI00kylzt6RK7WA', 'XLGO6tFBwV': 'aiWVzwNAQt4QoRfJBUWNPykxAPlWsY6vQFDg5UhafPybRTE970fmDKYnNRRuJpvz3xntNpargubjM3mYDqX6vBtRs37HeIWY5soX'}
Reference: azure.storage.blob.StorageStreamDownloader class | Microsoft Learn