pythonpython-requestszstandard

Decompressing a remote zstandard-compressed file on the fly


I would like to process many remote zst-compressed JSONL files that are available via http/https. I do not have enough space to store the file (I'm running many in parallel and they're many gigs), so I want to decompress them as they are being downloaded. I am testing the following code:

#!/usr/bin/env python3
import argparse
import json
import zstandard as zstd
import io
import requests
import time
from typing import Union
from urllib.parse import urlparse

def process_file(input_file: str) -> None:
    """
    Process a zstandard compressed JSONL file or a stream.

    This function takes a file path or a URL as input, decompresses it, and then
    processes each JSON object one by one.

    :param input_file: The input file path or URL to process.
    """

    # Create a decompression object
    # Increase the max_window_size if necessary
    # Be careful as this can potentially use a lot of memory
    # dctx = zstd.ZstdDecompressor(max_window_size=2**30)  # for example, allow up to 1 GB
    dctx = zstd.ZstdDecompressor()

    # Check if the input is a URL
    if input_file.startswith(('http://', 'https://')):
        # Stream the file from the URL
        response = requests.get(input_file, stream=True)
        reader = dctx.stream_reader(response.raw)
    else:
        # If the input is a file path
        f = open(input_file, 'rb')
        reader = dctx.stream_reader(f)

    text_stream = io.TextIOWrapper(reader, encoding='utf-8')

    line_counter = 0
    start_time = time.time()
    for line in text_stream:
        # Each line is a JSON string, so we parse it into a Python object
        data = json.loads(line)

        line_counter += 1
        if line_counter % 100_000 == 0:
            elapsed_time = time.time() - start_time
            print(f'Processed {line_counter:,d} lines in {elapsed_time:,.2f} seconds ({line_counter / elapsed_time:,.2f} lines/sec)')

    # Don't forget to close the file if it's not a URL
    if not input_file.startswith(('http://', 'https://')):
        f.close()

def main():
    """
    Main function to parse command line arguments and process the input file.
    """
    parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
    parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')

    args = parser.parse_args()

    process_file(args.input_file)

if __name__ == "__main__":
    main()

The local file on-the-fly decompression works, but the remote code raises a zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding exception; HOWEVER, I am certain it is not a memory issue. I set max_window_size=2**30 or 1GB and tested it on a 395K file (2.1M uncompressed) and still got the error. If I put response.content or response.raw or response.raw.data in the reader = dctx.stream_reader(response.raw) line, I get the same error.

If I add:

        response = requests.get(input_file, stream=True)
        with open('test.zst', 'wb') as fh:
            for chunk in response.raw:
                print(".", end="")
                fh.write(chunk)
            print
            exit()

It writes the file just fine and I can use zstd on the command line. Adding read_across_frames=True to the stream_reader does nothing:

reader = dctx.stream_reader(response.raw, read_across_frames=True)

And the documentation at https://python-zstandard.readthedocs.io/en/latest/decompressor.html is confusing at best but seems to be wrong in some places since some of the example code does not work.

How do I decompress a remote zst file on the fly?

More Info Using completely ex-novo code trying to use SFTP in stead of HTTP/HTTPS, I still get the same error.

#!/usr/bin/env python3

import argparse
import io
import bz2
import gzip
import paramiko
import zstandard as zstd
from urllib.parse import urlparse

class FileReader:
    def __init__(self, filename):
        """
        Initialize FileReader with a filename.

        :param filename: The name of the file to read.
        """
        self.filename = filename
        self.file = None
        self.sftp = None

    def __enter__(self):
        """
        Open the file when entering the context.
        """
        self.open()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """
        Close the file when exiting the context.
        """
        self.close()

    def __iter__(self):
        """
        Make the FileReader object iterable.
        """
        return self

    def __next__(self):
        """
        Provide the next line in the file.
        """
        line = self.file.readline()

        if not line:
            # End of file
            raise StopIteration
        else:
            return line

    def open(self):
        """
        Open the file for reading, decompressing it on the fly if necessary.
        """
        parsed = urlparse(self.filename)

        if parsed.scheme == 'sftp':
            # Handle SFTP URLs
            client = paramiko.SSHClient()
            client.load_system_host_keys()
            client.connect(parsed.hostname, username=parsed.username, password=parsed.password)
            sftp = client.open_sftp()
            f = sftp.file(parsed.path, 'rb')

            if self.filename.endswith('.zst'):
                dctx = zstd.ZstdDecompressor()
                self.file = io.TextIOWrapper(dctx.stream_reader(f), encoding='utf-8')
            elif self.filename.endswith('.bz2'):
                self.file = bz2.BZ2File(f)
            elif self.filename.endswith('.gz'):
                self.file = gzip.GzipFile(fileobj=f)
            else:
                self.file = f

            self.sftp = sftp
        else:
            if self.filename.endswith('.zst'):
                dctx = zstd.ZstdDecompressor()
                self.file = io.TextIOWrapper(dctx.stream_reader(open(self.filename, 'rb')), encoding='utf-8')
            elif self.filename.endswith('.bz2'):
                self.file = bz2.open(self.filename, 'rt')
            elif self.filename.endswith('.gz'):
                self.file = gzip.open(self.filename, 'rt')
            else:
                self.file = open(self.filename, 'r')

    def close(self):
        """
        Close the file.
        """
        if self.file is not None:
            self.file.close()
        if self.sftp is not None:
            self.sftp.close()


def main():
    """
    Main function, mostly for testing.
    """
    parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
    parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')

    args = parser.parse_args()

    with FileReader(args.input_file) as reader:
        for line in reader:
            print(line)
            exit()

if __name__ == "__main__":
    main()

Result:

> filereader.py -i sftp://sftp.example.com/path/to/file.zst
Traceback (most recent call last):
  File "filereader.py", line 114, in <module>
    main()
  File "filereader.py", line 109, in main
    for line in reader:
  File "filereader.py", line 45, in __next__
    line = self.file.readline()
           ^^^^^^^^^^^^^^^^^^^^
zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding

Works fine with .gz, .bz2, and uncompressed files over SFTP. It's just zstandard that's causing issues.


Solution

  • The code above works if you set

     dctx = zstd.ZstdDecompressor(max_window_size=2**31)
                                                  ^^^^^
    

    Instead of 2**30 if you are going to try and uncompress any zstandard files compressed with --long=31.

    For those who are interested: It appears that, for some (older?) versions of the zstd command-line tool, if you specify --long=31 it will make the long-distance matching up to 2GB in distance regardless of file size while some others (current?) appear to not allow matching that is >= the file size (which makes sense). So, when I was testing with a small file that was compress with an older zstd, it failed, but when I tried to recreate the problem, I could not since I was using a current version of zstd.

    I may post the more complete code that handles files over SFTP/SSH compressed in zst, bz2, and gz format.