pythonpython-requestsstreamlazy-loadingbz2

Python bz2 returns EOFerror before the whole file has been read


I am trying to lazily load items from a compressed file that resides in Zenodo. My goal is to iteratively yield the items without storing the file in my computer. My problem is that an EOFerror occurs right after the first non-empty line is read. How can I overcome this issue?

Here is my code:

import requests as req
import json
from bz2 import BZ2Decompressor


def lazy_load(file_url):
    dec = BZ2Decompressor()
    with req.get(file_url, stream=True) as res:
        for chunk in res.iter_content(chunk_size=1024):
            data = dec.decompress(chunk).decode('utf-8')
            # do something with 'data'


if __name__ == "__main__":
    creds = json.load(open('credentials.json'))
    url = 'https://zenodo.org/api/records/'
    id = '4617285'
    filename = '10.Papers.nt.bz2'
    res = req.get(f'{url}{id}', params={'access_token': creds['zenodo_token']})
    for file in res.json()['files']:
    if file['key'] == filename:
        for item in lazy_load(file['links']['self']):
            # do something with 'item'

The error I get is the following:

Traceback (most recent call last):
File ".\mag_loader.py", line 51, in <module>
  for item in lazy_load(file['links']['self']):
File ".\mag_loader.py", line 18, in lazy_load
  data = dec.decompress(chunk)
EOFError: End of stream already reache

To run the code you need a Zenodo access token, for which you need an account. Once you have logged in, you can create the token here: https://zenodo.org/account/settings/applications/tokens/new/


Solution

  • I had a similar problem, turns out it's because bz2 usually are "multi-stream", meaning a ".bz2 file" is just a stream of consecutive, independently encoded bz2 chunks.

    However, in the doc for python's bz2.Decompressor, it says:

    Note: This class does not transparently handle inputs containing multiple compressed streams, unlike decompress() and BZ2File. If you need to decompress a multi-stream input with BZ2Decompressor, you must use a new decompressor for each stream.

    so I had to modify the code like so:

    def lazy_load(file_url):
        dec = BZ2Decompressor()
        with req.get(file_url, stream=True) as res:
            for chunk in res.iter_content(chunk_size=1024):
                data = dec.decompress(chunk).decode('utf-8')
                # do something with 'data'
    
                # ===== new code here =====
                if dec.eof:
                    leftover = dec.unused_data
                    # you should see that 'leftover' is the start of a new stream
                    # beginning with "BZh9..."
                    print(f"EOF! {leftover=}")
                    # we have to start a new decompressor
                    dec = BZ2Decompressor()
                    data = dec.decompress(leftover).decode('utf-8')
                    # do something with 'data' here too