pythonpython-2.7csvbz2

How to read lines from arbitrary BZ2 streams for CSV?


The bz2 module provides a standard open() method from which one can call readline(). However, my situation is one where I have a stream (pointing to a large amount of data) that I want to decompress lines from on the fly. My current implementation is as follows but I know there must be a more succinct way to do this.

import bz2
import csv

BZ2_BUFFER = ''

BZ2_DECOMPRESSOR = None

BZ2_FILE = None

BZ2_READ_SIZE = 100 * 1024


def bz2_csv_rows(fp):
    global BZ2_BUFFER, BZ2_DECOMPRESSOR, BZ2_FILE, BZ2_READ_SIZE

    BZ2_BUFFER = ''
    BZ2_DECOMPRESSOR = bz2.BZ2Decompressor()
    BZ2_FILE = fp

    for row in csv.reader(iter(bz2_line_reader, b'')):
        yield row


def bz2_line_reader():
    global BZ2_BUFFER, BZ2_DECOMPRESSOR, BZ2_FILE, BZ2_READ_SIZE

    if BZ2_BUFFER is None:
        return None

    while '\n' not in BZ2_BUFFER:
        bindata = BZ2_FILE.read(BZ2_READ_SIZE)

        try:
            data = BZ2_DECOMPRESSOR.decompress(bindata)
        except EOFError:
            break
        except IOError:
            pass

        BZ2_BUFFER += data

        if len(data) < BZ2_READ_SIZE:
            BZ2_FILE = None
            break

    i = BZ2_BUFFER.find('\n')
    if i is None or i < 0:
        line = BZ2_BUFFER
        BZ2_BUFFER = None
        return line

    line = BZ2_BUFFER[:i]
    BZ2_BUFFER = BZ2_BUFFER[i + 1:]
    return line

Thoughts?


Solution

  • Here's something that's a little more succinct, and (in my opinion) it's more readable and gets rid of all those nasty global variables your code uses:

    import bz2
    import csv
    from functools import partial
    
    class BZ2_CSV_LineReader(object):
        def __init__(self, filename, buffer_size=4*1024):
            self.filename = filename
            self.buffer_size = buffer_size
    
        def readlines(self):
            with open(self.filename, 'rb') as file:
                for row in csv.reader(self._line_reader(file)):
                    yield row
    
        def _line_reader(self, file):
            buffer = ''
            decompressor = bz2.BZ2Decompressor()
            reader = partial(file.read, self.buffer_size)
    
            for bindata in iter(reader, b''):
                block = decompressor.decompress(bindata).decode('utf-8')
                buffer += block
                if '\n' in buffer:
                    lines = buffer.splitlines(True)
                    if lines:
                        buffer = '' if lines[-1].endswith('\n') else lines.pop()
                        for line in lines:
                            yield line
    
    if __name__ == '__main__':
    
        bz2_csv_filename = 'test_csv.bz2'
        for row in BZ2_CSV_LineReader(bz2_csv_filename).readlines():
            print(row)