pythonfile-iocompressionpython-multiprocessinglz4

Reading and Writing multiple compressed frames from single file


I have extremely large byte data stored in txt files (size of approx 1TB each, (4 depth cameras running 24-7 simultaneously) written at a rate of approx 4GB per minute) to a hard drive.

Each file contains a large number of image frames encoded in bytes. Each frame was compressed using lz4 compression and written to a corresponding camera file name, there are only 4 files though that will increase for longer recordings.

Each frame is of an unknown size, when only having the data.

Each frame is separated by an uncompressed bytearray pattern that was concatenated on to the end of the already compressed frame. Like so:

                map_to_sort.sort()
                frame_ender_bytes = bytearray("<FRAME_END>", 'utf-8')
                for k, frame_compressed in map_to_sort.mapper.items():
                    with lz4.frm.open(frame_compressed[0], 'ab') as fp:
                        fp.write(frame_compressed[1] + frame_ender_bytes)

I am attempting to read each frame, decompress it, and write the uncompressed frame to another 12TB hard drive (in 18, or preferably less, hours, (less time than it takes to write and compress the data to the file). I am unfortunately unable to use high capacity SSD's for this project so I have to some how manage the bottlenecks of reading and writing compressed data to hard discs.

I have come up with several different ideas on how to solve it however none of them seem to work that well. There are of course the niave approaches I have tried of reading the file byte-by-byte (too long to even read). I have also tried:

Reading the files in chunks that are approx the size of each frame and checking using split function like so:

import lz4.frame as frm
import numpy as np
def chunky(stream):
    buffer = bytearray()
    while True: 
        chunk = stream.read(40096)
        if chunk:
            buffer += chunk
        else:
            break
        while True:
            combine = buffer.split(b'<FRAME_END>', 1)
            if len(combine) < 2:
                break
            else:
                buffer = combine[1]
                part = combine[0]
                yield part

#................................................
# Elsewhere in code
# dec_path is a string variable that stores the decompressed file path corresponding to the camera path file
with frm.open(camera_file_path, 'rb') as fp:
    for ch in chunky(fp):
        output_arr = ch
        decompressed_frame = frm.decompress(output_arr)
        with frm.open(dec_path, 'ab') as fj:
            fj.write(decompressed_frame)

There are of course a multitude of edge cases with this approach ((What if the <FRAME_END> is split by the read into one chunk reading ......, the split function won't work and it will show an unrecognized frame. (Even so this doesn't happen that often and times for decompression and writing are still greater than 18 hours)

Create a JSON/txt file that contains a map like structure. Where each key maps to the starting and ending location of each frame in bytes. Then storing the data as a dictionary and then utilizing it to read each frame in the appropriate number of bytes. Writing the compressed file:

# In this example there are no separators between each frame as I am keeping track of positions.
# map_to_sort is a dictionary organized like so 
# int k : ["E:\\Frame_Data\cam_compressed_0.txt", #compressed frame, 56000, 10000, "E:\\Frame_Data\\cam_compressed_0_mapper_data.txt"]
map_to_sort.sort()
for k, frame_compressed in map_to_sort.mapper.items():
    with frm.open(frame_compressed[0], 'ab') as fp:
         fp.write(frame_compressed[1])
    with open(frame_compressed[4], 'w') as fil:
         str_to_write = str(k) + "," + str(frame_compressed[2]) + "," + str(frame_compressed[3]) + "," + frame_compressed[0] + "," + "\n"
         fil.write(str_to_write)
map_to_sort.clear()

Reading and writing the decompressed file

with open(TEXT_COMPRESSED_MAPPER_PATH, 'r') as fr:
     for line in fr:
         str_arr = (line.strip()).split(",")
         file_comp_read_from = str_arr[3]
         start_byte_pos = str_arr[1]
         end_byte_pos = str_arr[2]
         with frm.open(str_arr[3], 'rb') as fp:
             fp.seek(start_byte_pos, 0)
             fp.read(end_byte_pos - start_byte_pos)
         # Same decompression code as before

However the downside to this was both the memory taken up by the text or json file (not insignificant) and the RAM (RAM limitations). As well as the fact that the frame reads were inconsistent and I would get many frames that just weren't decompressed properly. Perhaps I am uncertain if I am collecting the true exact location relative to the file size where the frame begins and where it ends. If any one has any pointers that could also help.

The final thing that would most likely help is increasing lz4 compression level from 7 to something higher. Smaller file sizes would undoubtedly cut down on the management. (Decompression is not that intensive on my CPU).

At compression level 7 I am already using all 8 cores at about 70% and at level 8 compression I start losing frames as it is taking too long to decompress, as well as having a CPU usage at 98%.

A lot of the compression code uses a combination of multi threading, multi-processing. The code is too long to post here nor am I necessarily allowed to post a link containing it (not open source as of yet, I can post parts of it maybe if you wish).

The basic idea is to have one process handling recording and placing the frames into a shared queue, another process happens simultaneously that checks if the queue is empty or not and then processes the queue (pops of frames) utilizing threads to compress multiple frames at once. The unsorted frames are then sorted and written to corresponding files, in order. If any one has any pointers on how to significantly increase compression ratio without too much strain on my CPU that would be nice.

I am not too happy with this design as it is not great practice using the shared queue between the two processes. However I saw little choice as the Intel Depth Cameras I am using don't particularly play nice with the multiprocessing module, so I couldn't easily set up a process for each camera to have. Nor does the implementation of the Intel Depth Cameras python module lend itself to record, and stream simultaneously in a simple way.

If you wish to see more code on the way I am handling the file compression, and i/o please let me know.

Feel free to mention lossy or other faster or

Sorry for the large amount of information.

TLDR: Trouble decompressing large number of files in small amount of time using lz4 compression in python, and then writing the decompressed files to another hard drive, utilizing I/0. Some code and exact info above. The goal is to be able to decompress all the files and write them to another hard drive in 18 hours, or less.

EDIT

I was able to get compression levels that I was looking for utilizing both the method suggested in the solution. (Much faster read and write times). However these worked best using gzip at level 6 compression not lz4. I am seeing around a 6:1 compression ratio for the majority of my files. The more cores and higher quality your cpu is the better results you will get. I have 4 simultaneous depth streams, utilizing a AMD Ryzen 7 5800X processor, any higher compression levels and I started to drop frames. Theoretically if you had more cores you might be able to get better results. I have not tested this though. The major bottleneck you will run into will be the speed of your hard disks. If you have the budget I would highly suggest SSD.


Solution

  • Here's a proof of concept as far as using a fixed length header to split up blobs in your file. Not sure if this solves all your problems but maybe part.

    import random
    import string
    import tempfile
    import os
    
    def bytes_generator():
        out = []
        for i in range(random.randint(10, 20)):
            out.append(random.choice(string.ascii_lowercase))
        return ''.join(out).encode('utf-8')
    
    INT_BYTE_SIZE = 8
    def int_to_bytes(x: int) -> bytes:
        return x.to_bytes(INT_BYTE_SIZE, 'big')
    def int_from_bytes(xbytes: bytes) -> int:
        return int.from_bytes(xbytes, 'big')
    
    def run():
        filehandle = tempfile.TemporaryFile()
        blobs = []
        for i in range(10):
            data = bytes_generator()
            blobs.append(data)
            int_bytes = int_to_bytes(len(data))
            filehandle.write(int_bytes)
            filehandle.write(data)
    
        # Reset as if just opened
        filehandle.seek(0, os.SEEK_SET)
    
        # Get file legnth
        filehandle.seek(0, os.SEEK_END)
        file_length = filehandle.tell()
    
        # Reset for reading
        filehandle.seek(0, os.SEEK_SET)
    
        i = 0
        while filehandle.tell() < file_length:
            data_length = int_from_bytes(filehandle.read(INT_BYTE_SIZE))
            read_data = filehandle.read(data_length)
            assert read_data == blobs[i]
            i += 1
    
    if __name__ == "__main__":
        run()
    

    int/byte conversion stolen from Converting int to bytes in Python 3 more or less