I am trying to lazily load items from a compressed file that resides in Zenodo. My goal is to iteratively yield the items without storing the file in my computer. My problem is that an EOFerror occurs right after the first non-empty line is read. How can I overcome this issue?
Here is my code:
import requests as req
import json
from bz2 import BZ2Decompressor
def lazy_load(file_url):
dec = BZ2Decompressor()
with req.get(file_url, stream=True) as res:
for chunk in res.iter_content(chunk_size=1024):
data = dec.decompress(chunk).decode('utf-8')
# do something with 'data'
if __name__ == "__main__":
creds = json.load(open('credentials.json'))
url = 'https://zenodo.org/api/records/'
id = '4617285'
filename = '10.Papers.nt.bz2'
res = req.get(f'{url}{id}', params={'access_token': creds['zenodo_token']})
for file in res.json()['files']:
if file['key'] == filename:
for item in lazy_load(file['links']['self']):
# do something with 'item'
The error I get is the following:
Traceback (most recent call last):
File ".\mag_loader.py", line 51, in <module>
for item in lazy_load(file['links']['self']):
File ".\mag_loader.py", line 18, in lazy_load
data = dec.decompress(chunk)
EOFError: End of stream already reache
To run the code you need a Zenodo access token, for which you need an account. Once you have logged in, you can create the token here: https://zenodo.org/account/settings/applications/tokens/new/
I had a similar problem, turns out it's because bz2 usually are "multi-stream", meaning a ".bz2 file" is just a stream of consecutive, independently encoded bz2 chunks.
However, in the doc for python's bz2.Decompressor, it says:
Note: This class does not transparently handle inputs containing multiple compressed streams, unlike decompress() and BZ2File. If you need to decompress a multi-stream input with BZ2Decompressor, you must use a new decompressor for each stream.
so I had to modify the code like so:
def lazy_load(file_url):
dec = BZ2Decompressor()
with req.get(file_url, stream=True) as res:
for chunk in res.iter_content(chunk_size=1024):
data = dec.decompress(chunk).decode('utf-8')
# do something with 'data'
# ===== new code here =====
if dec.eof:
leftover = dec.unused_data
# you should see that 'leftover' is the start of a new stream
# beginning with "BZh9..."
print(f"EOF! {leftover=}")
# we have to start a new decompressor
dec = BZ2Decompressor()
data = dec.decompress(leftover).decode('utf-8')
# do something with 'data' here too