I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.
Is this possible in python?
The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.
I not sure if multiprocessing/mutli-threading or async/await would help.
You need to use Pipelines (this blog helps you to understand whats pipeline).
First of all use some library like queue
, where you need to set up separate queues for each processing stage. That allows files to flow through the pipeline as soon as they're ready.
Then you can use from concurrent.futures import ThreadPoolExecutor
, this is ideal for I/O tasks cause it allows the program to continue processing while waiting for downloads or what are your program doing...
Also, in each stage you should have some worker threads that pull from input queue and push to the next stage queue.
So if you make everything right, soon as a file is downloaded, it's immediately available for conversion while other downloads continue. Same with converted files, they're immediately sent for chart creation.
It should looks like this:
class ProcessingPipeline:
def __init__(self, urls, download_dir, csv_dir, chart_dir, max_workers):
self.urls = urls
self.download_dir = download_dir
self.csv_dir = csv_dir
self.chart_dir = chart_dir
self.max_workers = max_workers
self.download_queue = queue.Queue()
self.convert_queue = queue.Queue()
self.chart_queue = queue.Queue()
# Below should be your functions that processing the data
... ... ...
def run(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
download_futures = [executor.submit(self.download_worker)
for _ in range(min(self.max_workers, len(self.urls)))]
convert_futures = [executor.submit(self.convert_worker)
for _ in range(self.max_workers)]
chart_futures = [executor.submit(self.chart_worker)
for _ in range(self.max_workers)]
# waiting for prociding all queues
self.download_queue.join()
self.convert_queue.join()
self.chart_queue.join()
if __name__ == "__main__":
res = ProcessingPipeline(
urls="some_urls",
download_dir="your_data",
csv_dir="your_files",
chart_dir="your_charts",
max_workers=3
)
res.run()
Hoping, this helps