pythonpython-3.xfile-handlingseek

Processing large files in chunks: inconsistent seek with readline


I am trying to read and process a large file in chunks with Python. I am following this blog that proposes a very fast way of reading and processing large chunks of data spread over multiple processes. I have only slightly updated the existing code, i.e. using stat(fin).st_size over os.path.getsize. In the example I also haven't implemented multiprocessing, as the issue also manifests itself in a single process. That makes it easier to debug.

The issue that I am having with this code, is that it returns broken sentences. This makes sense: the pointers do not take line endings into account, and just return some given byte size. In practice, one would assume that you could solve this by leaving out the last item in the fetched batch of lines, as that would most probably be the broken line. Unfortunately that does not work reliably either.

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            f.readline()
            chunk_end = f.tell()
            yield chunk_start, chunk_end - chunk_start

            if chunk_end > file_end:
                break


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        batch = f.read(chunk_size).splitlines()

    # changing this to batch[:-1] will result in 26 lines total
    return batch


if __name__ == '__main__':
    fin = r'data/tiny.txt'
    lines_n = 0
    for start, size in chunkify(fin):
        lines = process_batch(fin, start, size)
        # Uncomment to see broken lines
        # for line in lines:
        #    print(line)
        # print('\n')
        lines_n += len(lines)

    print(lines_n)
    # 29

The code above will print 29 as the total of processed lines. When you do not return the last item of the batch, naively assuming that that is a broken line anyway, you'll get 26. The actual number of lines is 27. The testing data can be found below.

She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1

If you print out the created lines, you'll see that, indeed, broken sentences occur. I find this odd. Should't f.readline() ensure that the file is read until the next line? In the output below, the empty line separates two batches. That means that you cannot check a line with the next line in a batch, and remove it if it's a substring - the broken sentence belongs to another batch than the full sentence.

...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r


In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...

Is there a way to get rid of these broken sentences, without removing too much?

You can download a larger test file (100,000 lines) here.


After a lot of digging, it seems that actually some inaccessible buffer is responsible for the inconsistent behaviour of seek, as discussed here and here. I tried out the proposed solution to use iter(f.readline, '') with seek but that still gives me inconsistent results. I have updated my code to return the file pointer after each batch of 1500 lines, but in reality the batches return will overlap.

from os import stat
from functools import partial


def chunkify(pfin, max_lines=1500):
    file_end = stat(pfin).st_size
    with open(pfin, 'r', encoding='utf-8') as f:
        chunk_end = f.tell()

        for idx, l in enumerate(iter(f.readline, '')):
            if idx % max_lines == 0:
                chunk_start = chunk_end
                chunk_end = f.tell()
                # yield start position, size, and is_last
                yield chunk_start, chunk_end - chunk_start

    chunk_start = chunk_end
    yield chunk_start, file_end


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size).splitlines()

    batch = list(filter(None, chunk))

    return batch


if __name__ == '__main__':
    fin = r'data/100000-ep+gutenberg+news+wiki.txt'

    process_func = partial(process_batch, fin)
    lines_n = 0

    prev_last = ''
    for start, size in chunkify(fin):
        lines = process_func(start, size)

        if not lines:
            continue

        # print first and last ten sentences of batch
        for line in lines[:10]:
            print(line)
        print('...')
        for line in lines[-10:]:
            print(line)
        print('\n')

        lines_n += len(lines)

    print(lines_n)

An example of overlapping batches is below. The first two and a half sentence of the last batch are duplicated from the last sentences of the batch before. I don't know how to explain nor solve this.

...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els


For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...

In addition to this, I have also tried removing the readline from the original code, and keeping track of a remaining, incomplete chunk. The incomplete chunk is then passed to the next chunk and added to its front. The issue that I am running into now, is that because the text is read in byte chunks, it can happen that a chunk ends without completely finishing a character's bytes. This wille lead to decoding errors.

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            chunk_end = f.tell()
            is_last = chunk_end >= file_end
            # yield start position, size, and is_last
            yield chunk_start, chunk_end - chunk_start, is_last

            if is_last:
                break


def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size)

    # Add previous leftover to current chunk
    chunk = leftover + chunk
    batch = chunk.splitlines()
    batch = list(filter(None, batch))

    # If this chunk is not the last one,
    # pop the last item as that will be an incomplete sentence
    # We return this leftover to use in the next chunk
    if not is_last:
        leftover = batch.pop(-1)

    return batch, leftover


if __name__ == '__main__':
    fin = r'ep+gutenberg+news+wiki.txt'

    lines_n = 0
    left = ''
    for start, size, last in chunkify(fin):
        lines, left = process_batch(fin, start, size, last, left)

        if not lines:
            continue

        for line in lines:
            print(line)
        print('\n')

        numberlines = len(lines)
        lines_n += numberlines

    print(lines_n)

Running the code above, will inevitably result in a UnicodeDecodeError.

Traceback (most recent call last):
  File "chunk_tester.py", line 46, in <module>
    lines, left = process_batch(fin, start, size, last, left)
  File "chunk_tester.py", line 24, in process_batch
    chunk = f.read(chunk_size)
  File "lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte

Solution

  • You were so close! A relatively simple change to your final code (reading in the data as bytes and not str) makes it all (almost) work.

    The main issue was because reading from binary files counts bytes, but reading from text files counts text, and you did your first counting in bytes and your second in characters, leading to your assumptions about what data had already been read to be wrong. It's nothing about an internal, hidden buffer.

    Other changes:

    This gives the final code:

    from os import stat
    
    def chunkify(pfin, buf_size=1024**2):
        file_end = stat(pfin).st_size
    
        i = -buf_size
        for i in range(0, file_end - buf_size, buf_size):
            yield i, buf_size, False
    
        leftover = file_end % buf_size
        if leftover == 0:  # if the last section is buf_size in size
            leftover = buf_size
        yield i + buf_size, leftover, True
    
    def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
        with open(pfin, 'rb') as f:
            f.seek(chunk_start)
            chunk = f.read(chunk_size)
    
        # Add previous leftover to current chunk
        chunk = leftover + chunk
        batch = chunk.split(b'\n')
    
        # If this chunk is not the last one,
        # pop the last item as that will be an incomplete sentence
        # We return this leftover to use in the next chunk
        if not is_last:
            leftover = batch.pop(-1)
    
        return [s.decode('utf-8') for s in filter(None, batch)], leftover
    
    
    if __name__ == '__main__':
        fin = r'ep+gutenberg+news+wiki.txt'
    
        lines_n = 0
        left = b''
        for start, size, last in chunkify(fin):
            lines, left = process_batch(fin, start, size, last, left)
    
            if not lines:
                continue
    
            for line in lines:
                print(line)
            print('\n')
    
            numberlines = len(lines)
            lines_n += numberlines
    
        print(lines_n)