pythonpython-3.xcompressiondata-processingbz2

How to split big 30GB bz2 file into multiple small bz2 files and add a header to each


I have large number of bz2 formatted files (30GB each) without any header. I can split them easily in 500M in each size with the following pileline.

bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-

But I cannot add the header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp'] that I want to include each of the splitted bz2 file.

More importantly I want to split the file not based on the 500M, rather I want to split the bz2 file per day (example: splitted_file_2021-01-01.csv.bz2 and splitted_file_2021-01-02.csv.bz2)based on the content of timestamp in the data.

Data is tab-delimited text, like below (no header, need to add them):

19252547212 1   3041    2   1   74.18   1.8504  2021-05-01 00:00:00
19252547213 1   5055    2   1   0       0       2021-05-01 00:00:00
19252547214 1   5073    1   1   53.81   0.1836  2021-05-01 00:00:00

Solution

  • You can use the bz2 package to open BZ2 encoded files and treat them as regular file objects. There is a minor performance advantage to read / write in binary. Assuming your data is either ASCII or UTF-8 and no tab characters need to be escaped in the data, you can just read the file line by line, opening and writing the outputs as new timestamps appear.

    import bz2
    import os
    
    outfile = None
    date = b""
    
    with bz2.open("file") as fileobj:
        for line in filobj:
            # get date from, ex. "2021-05-01 00:00:00", timestamp
            new_date = line.split(b"\t")[7].split(b" ")[0]
            # roll to new file as needed, appending, so existing data not overwritten
            if new_date != date:
                date = new_date
                new_file = f"splitted_file_{new_date}.csv.bz2"
                exists = os.path.exists(new_file)
                outfile = bz2.open(new_file, "ab")
                if not exists:
                    outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
            # write the row
            outfile.writeline(line)
    if outfile:
        outfile.close()
    

    You may be able to speed this up with a pipeline. Give both the decryption and encryption to separate bzip2 processes that will run in parallel on different cores. Instead of a shell pipeline, you can create pipes and files to do it in the script itself. Assuming bzip2 exists on your system you could do the following. I added the tqdm module to print progress along the way.

    #!/usr/bin/env python3
    
    import subprocess as subp
    from pathlib import Path
    import sys
    import tqdm
    
    # TODO: Better command line
    try:
        in_file_name = Path(sys.argv[1])
    except IndexError:
        print("usage: unbzcsv.py filename")
        exit(1)
    
    # build the format string used for generating output file names
    out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
    out_file = None
    date = b""
    bzwriter = None
    bzfile = None
    
    # run bzip2 to decompress to stdout
    bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name], 
            stdin=subp.DEVNULL, stdout=subp.PIPE)
    
    # use tqdm to display progress as line count
    progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)
    
    # read lines and fan out to files
    try:
        for line in progress:
            # get date from, ex. "2021-05-01 00:00:00", timestamp
            new_date = line.split(b"\t")[7].split(b" ")[0]
            # roll to new file as needed, appending, so existing data not overwritten
            if new_date != date:
                date = new_date
                out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
                if bzwriter is not None:
                    bzwriter.stdin.close()
                    bzwriter.wait()
                    bzwriter = None
                    bzfile.close()
                print("\nwriting", out_file_name)
                progress.refresh()
                bzfile = open(out_file_name, "wb")
                bzwriter = subp.Popen(["bzip2", "--compress"],
                        stdin=subp.PIPE, stdout=bzfile)
            # write the row
            bzwriter.stdin.write(line)
    finally:
        bzreader.terminate() # in case of error
        if bzwriter:
            bzwriter.stdin.close()
            bzwriter.wait()
            bzfile.close()
        bzreader.wait()