I have large number of bz2
formatted files (30GB
each) without any header. I can split them easily in 500M
in each size with the following pileline
.
bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-
But I cannot add the header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp']
that I want to include each of the splitted bz2
file.
More importantly I want to split the file not based on the 500M
, rather I want to split the bz2
file per day (example: splitted_file_2021-01-01.csv.bz2
and splitted_file_2021-01-02.csv.bz2
)based on the content of timestamp
in the data.
Data is tab-delimited text, like below (no header, need to add them):
19252547212 1 3041 2 1 74.18 1.8504 2021-05-01 00:00:00
19252547213 1 5055 2 1 0 0 2021-05-01 00:00:00
19252547214 1 5073 1 1 53.81 0.1836 2021-05-01 00:00:00
You can use the bz2
package to open BZ2 encoded files and treat them as regular file objects. There is a minor performance advantage to read / write in binary. Assuming your data is either ASCII or UTF-8 and no tab characters need to be escaped in the data, you can just read the file line by line, opening and writing the outputs as new timestamps appear.
import bz2
import os
outfile = None
date = b""
with bz2.open("file") as fileobj:
for line in filobj:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
new_file = f"splitted_file_{new_date}.csv.bz2"
exists = os.path.exists(new_file)
outfile = bz2.open(new_file, "ab")
if not exists:
outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
# write the row
outfile.writeline(line)
if outfile:
outfile.close()
You may be able to speed this up with a pipeline. Give both the decryption and encryption to separate bzip2 processes that will run in parallel on different cores. Instead of a shell pipeline, you can create pipes and files to do it in the script itself. Assuming bzip2
exists on your system you could do the following. I added the tqdm
module to print progress along the way.
#!/usr/bin/env python3
import subprocess as subp
from pathlib import Path
import sys
import tqdm
# TODO: Better command line
try:
in_file_name = Path(sys.argv[1])
except IndexError:
print("usage: unbzcsv.py filename")
exit(1)
# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None
# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name],
stdin=subp.DEVNULL, stdout=subp.PIPE)
# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)
# read lines and fan out to files
try:
for line in progress:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
if bzwriter is not None:
bzwriter.stdin.close()
bzwriter.wait()
bzwriter = None
bzfile.close()
print("\nwriting", out_file_name)
progress.refresh()
bzfile = open(out_file_name, "wb")
bzwriter = subp.Popen(["bzip2", "--compress"],
stdin=subp.PIPE, stdout=bzfile)
# write the row
bzwriter.stdin.write(line)
finally:
bzreader.terminate() # in case of error
if bzwriter:
bzwriter.stdin.close()
bzwriter.wait()
bzfile.close()
bzreader.wait()