I an trying to use Python to generate a compressed file. The text generation rule, i.e., pattern, is fairly naive, the overhead of computing text can be hence ignored.
I managed to put together a code to generate it in by streamline, i.e., gzip consumes the generated text on-fly, see below:
import gzip
from time import time
def gen_data( chunk_size = 1024 * 1024 ):
text = "int {n} = {n}\n"
total_size = 50 * ( 1024**3 )
n = 0
while n < total_size:
chunk = []
for _ in range( chunk_size // len( text ) ):
chunk.append( text.format( n=n ) )
n += len( text )
yield "".join( chunk ).encode( 'utf-8' )
def stream_compress():
start = time()
with gzip.open( "compressed.gz", 'wb' ) as f:
for chunk in gen_data():
f.write( chunk )
print( f"Done. Time elapsed is {time() - start:.2f} seconds" );
f.close()
However, the performance is still below my expectation, it takes ~30mins to generate a ~50G text file.
I wonder is there any way by which the performance can be yet improved?
To use multi-thread, the most intuitive way surely is to separate the file into several ones, but it is not feasible in this case due to some API restrictions, can it be applied here some way else?
There are lot of reasons to use lot of CPU and this can not be easily solved, because this artificial example looks like use lot of CPU in zlib compression.
So, first step to speed up - to divide function which generate data and function which write data. I suggest to use multiprocessing queue for this. The code example with some comments:
import gzip
from time import time
from multiprocessing import Process, Queue
def gen_data(queue: Queue, chunk_size = 1024 * 1024):
text = "int {n} = {n}\n"
total_size = 50 * ( 1024**3 )
n = 0
while n < total_size:
chunk = []
chunk_bytes = 0
for _ in range( chunk_size // len( text ) ):
chunk.append( text.format( n=n ) )
n += len( text )
queue.put("".join(chunk).encode('utf-8'))
queue.put(None)
def compress_data(queue: Queue, output_file="compressed.gz"):
with gzip.open( "compressed.gz", 'wb' ) as f:
while True:
chunk = queue.get()
# Go away if the generation completed
if chunk is None:
break
f.write( chunk )
def main():
start = time()
# Create a queue, you can play with maxsize
queue = Queue(maxsize=10)
producer = Process(target=gen_data, args=(queue,))
consumer = Process(target=compress_data, args=(queue,))
producer.start()
consumer.start()
producer.join()
consumer.join()
print(f"Done. Time elapsed is {time() - start:.2f} seconds")
if __name__ == "__main__":
main()
This can increase speed a bit.
Next thing - use another compression level. The gzip.open()
has the compresslevel
argument with values from 1 - the fastest to 9 - the best compression. You can definitely have increase of speed here with lower compression level.
Then some other small speedups:
str.format
. I.e.chunk.append( f"int {n} = {n}\n" )
io.BytesIO
, i.e. you open BytesIO
as file and write there prepared chunks.with io.BytesIO() as bio:
for _ in range( chunk_size // len( text ) ):
chunk = f"int {n} = {n}\n"
n += len(chunk)
bio.write(chunk.encode("utf-8")
queue.put(bio.getvalue())