pythonmultithreadingfile-generationcompressed-files

Can I use multi-thread to accelerate file generation by Python? what else can I do to improve the performance?


I an trying to use Python to generate a compressed file. The text generation rule, i.e., pattern, is fairly naive, the overhead of computing text can be hence ignored.

I managed to put together a code to generate it in by streamline, i.e., gzip consumes the generated text on-fly, see below:

import gzip
from time import time

def gen_data( chunk_size = 1024 * 1024 ):
    text = "int {n} = {n}\n"
    total_size = 50 * ( 1024**3 )
    n = 0
    while n < total_size:
        chunk = []
        for _ in range( chunk_size // len( text ) ):
            chunk.append( text.format( n=n ) )
            n += len( text )
        yield "".join( chunk ).encode( 'utf-8' )

def stream_compress():
    start = time()
    with gzip.open( "compressed.gz", 'wb' ) as f:
        for chunk in gen_data():
            f.write( chunk )
    print( f"Done. Time elapsed is {time() - start:.2f} seconds" );
    f.close()

However, the performance is still below my expectation, it takes ~30mins to generate a ~50G text file.

I wonder is there any way by which the performance can be yet improved?

To use multi-thread, the most intuitive way surely is to separate the file into several ones, but it is not feasible in this case due to some API restrictions, can it be applied here some way else?


Solution

  • There are lot of reasons to use lot of CPU and this can not be easily solved, because this artificial example looks like use lot of CPU in zlib compression.

    So, first step to speed up - to divide function which generate data and function which write data. I suggest to use multiprocessing queue for this. The code example with some comments:

    import gzip
    from time import time
    from multiprocessing import Process, Queue
    
    def gen_data(queue: Queue, chunk_size = 1024 * 1024):
        text = "int {n} = {n}\n"
        total_size = 50 * ( 1024**3 )
        n = 0
    
        while n < total_size:
            chunk = []
            chunk_bytes = 0
            for _ in range( chunk_size // len( text ) ):
                chunk.append( text.format( n=n ) )
                n += len( text )
            queue.put("".join(chunk).encode('utf-8'))
    
        queue.put(None)
    
    def compress_data(queue: Queue, output_file="compressed.gz"):
        with gzip.open( "compressed.gz", 'wb' ) as f:
            while True:
                chunk = queue.get()
    
                # Go away if the generation completed
                if chunk is None:
                    break
    
                f.write( chunk )
    
    def main():
        start = time()
        # Create a queue, you can play with maxsize 
        queue = Queue(maxsize=10)
    
        producer = Process(target=gen_data, args=(queue,))
        consumer = Process(target=compress_data, args=(queue,))
    
        producer.start()
        consumer.start()
    
        producer.join()
        consumer.join()
    
        print(f"Done. Time elapsed is {time() - start:.2f} seconds")
    
    if __name__ == "__main__":
        main()
    

    This can increase speed a bit.

    Next thing - use another compression level. The gzip.open() has the compresslevel argument with values from 1 - the fastest to 9 - the best compression. You can definitely have increase of speed here with lower compression level.

    Then some other small speedups:

    chunk.append( f"int {n} = {n}\n" )
    
    with io.BytesIO() as bio:
        for _ in range( chunk_size // len( text ) ):
            chunk = f"int {n} = {n}\n"
            n += len(chunk)
            bio.write(chunk.encode("utf-8")
        queue.put(bio.getvalue())