I have a huge list of strings called term_list
that I process one-by-one in a function called run_mappers()
. One of the args is a csv_writer
object. I append results to a list called from_mapper
in the function. I write that list to a csv file using the csv_writer
object. In my scouring for help, I read that multiprocessing module is not recommended for csv writing because it it pickles and csv_writer objects can't be pickled (can't find reference for this now in my billion tabs open on my desktop). I am not sure if multiprocessing is best suited for my task anyway.
def run_mappers(individual_string, other_args, csv_writer):
# long processing code goes here, ending up with processed_result
from_mapper.append(processed_result)
csv_writer.writerow(processed_result)
I want to speed up processing of this huge list, but am trying to control for memory usage by splitting the list into batches to process (term_list_batch). So I try:
def parallelize_mappers(term_list_batch, other_args, csv_writer):
future_to_term = {}
terms_left = len(term_list_batch)
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
future_to_term = {executor.submit(run_mappers, term_list_batch, other_args, csv_writer): term for term in term_list_batch}
try:
for future in concurrent.futures.as_completed(future_to_term, timeout=180): # timeout after 3 min
term = future_to_term[future]
try:
result = future.result()
# Process result if needed
except Exception as exc:
print(f"Job {term} generated an exception: {exc}")
finally:
terms_left -= 1
if terms_left % 10 == 0:
gc.collect()
time.sleep(2)
except concurrent.futures.TimeoutError:
print("Timeout occurred while processing futures")
for key, future in future_to_term.items():
if key not in results:
future.cancel()
When I get a Timeouterror, my process just hangs and I'm not sure what to do to keep moving forward in my huge term_list. I also don't want to terminate the program. I just want to keep moving through term_list, or process the next batch. If a thread fails or something, I just want to ignore the term or toss the whole thread and continue processing term_list to write as many results to the file as I can.
Amongst my many attempts to trouble-shoot, I tried something like this, but am posting the one above as my best shot since it crunched through a few hundred terms before stalling on me. Other tries I've had had just died, had some Runtime error, had threads deadlocking, etc.
For reference, another attempt is below:
def parallelize_mappers(term_list_batch, other_args, csv_writer):
timeout = 120
terminate_flag = threading.Event()
# Create a thread for each term
threads = []
for term in term_list_batch:
thread = threading.Thread(target=run_mappers, args=(term, other_args, csv_writer, terminate_flag))
threads.append(thread)
thread.start()
# Wait for all threads to complete or timeout
for thread in threads:
thread.join(timeout)
# If the thread is still alive, it has timed out
if thread.is_alive():
print("Thread {} timed out. Terminating...".format(thread.name))
terminate_flag.set() # Set the flag to terminate the thread
Then I added a while not terminate_flag.is_set()
to the run_mappers()
function before executing rest of processing code. But this is just unbearably slow. Thank you in advance.
Mock code to reproduce/term_list to process below:
term_list = ['Dementia',
'HER2-positive Breast Cancer',
'Stroke',
'Hemiplegia',
'Type 1 Diabetes',
'Hypospadias',
'IBD',
'Eating',
'Gastric Cancer',
'Lung Cancer',
'Carcinoid',
'Lymphoma',
'Psoriasis',
'Fallopian Tube Cancer',
'Endstage Renal Disease',
'Healthy',
'HRV',
'Recurrent Small Lymphocytic Lymphoma',
'Gastric Cancer Stage III',
'Amputations',
'Asthma',
'Lymphoma',
'Neuroblastoma',
'Breast Cancer',
'Healthy',
'Asthma',
'Carcinoma, Breast',
'Fractures',
'Psoriatic Arthritis',
'ALS',
'HIV',
'Carcinoma of Unknown Primary',
'Asthma',
'Obesity',
'Anxiety',
'Myeloma',
'Obesity',
'Asthma',
'Nursing',
'Denture, Partial, Removable',
'Dental Prosthesis Retention',
'Obesity',
'Ventricular Tachycardia',
'Panic Disorder',
'Schizophrenia',
'Pain',
'Smallpox',
'Trauma',
'Proteinuria',
'Head and Neck Cancer',
'C14',
'Delirium',
'Paraplegia',
'Sarcoma',
'Favism',
'Cerebral Palsy',
'Pain',
'Signs and Symptoms, Digestive',
'Cancer',
'Obesity',
'FHD',
'Asthma',
'Bipolar Disorder',
'Healthy',
'Ayerza Syndrome',
'Obesity',
'Healthy',
'Focal Dystonia',
'Colonoscopy',
'ART',
'Interstitial Lung Disease',
'Schistosoma Mansoni',
'IBD',
'AIDS',
'COVID-19',
'Vaccines',
'Beliefs',
'SAH',
'Gastroenteritis Escherichia Coli',
'Immunisation',
'Body Weight',
'Nonalcoholic Steatohepatitis',
'Nonalcoholic Fatty Liver Disease',
'Prostate Cancer',
'Covid19',
'Sarcoma',
'Stroke',
'Liver Diseases',
'Stage IV Prostate Cancer',
'Measles',
'Caregiver Burden',
'Adherence, Treatment',
'Fracture of Distal End of Radius',
'Upper Limb Fracture',
'Smallpox',
'Sepsis',
'Gonorrhea',
'Respiratory Syncytial Virus Infections',
'HPV',
'Actinic Keratosis']
The way I see it, you want to parallel or multitask run_mappers()
because this function might take a long time to finish. The CSV writing part does not need to be run in parallel because it is done relatively quick.
The first step is to redesign run_mappers()
NOT to take in as parameter a CSV writer. Instead, this function should return the processed_result
. This function might raise an exception and we will ignore the result for that thread. To be useful, I will write the errors out to err.txt
import csv
import logging
import random
import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s | %(levelname)s | %(message)s",
)
term_list = [
"Dementia",
# ... omitted for brevity
"Actinic Keratosis",
]
def run_mappers(individual_string, other_args):
# Simulate long processing time to get processed_result
time.sleep(random.randint(1, 2))
processed_result = [individual_string.strip(), other_args]
# Simulate an exception
if random.randint(1, 20) == 5:
logging.error("%r -> failed", individual_string)
raise (ValueError(individual_string))
logging.debug("%r -> %r", individual_string, processed_result)
return processed_result
def main():
"""Entry"""
# run_mappers takes a long time, so this part is done in parallel
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(run_mappers, term, "other-args")
for term in term_list
]
# Writing to CSV does not need to be done in parallel because
# it is relatively quick
logging.info("Writing to CSV")
with open("out.csv", "w") as stream, open("err.txt", "w") as err:
writer = csv.writer(stream)
for future in futures:
if future.exception():
err.write(f"{future.exception()}\n")
else:
writer.writerow(future.result())
logging.info("Done CSV")
if __name__ == "__main__":
main()
Output
2024-03-02 09:49:00,335 | DEBUG | 'HER2-positive Breast Cancer' -> ['HER2-positive Breast Cancer', 'other-args']
2024-03-02 09:57:55,174 | ERROR | 'Breast Cancer' -> failed
2024-03-02 09:49:11,366 | DEBUG | 'HPV' -> ['HPV', 'other-args']
...
2024-03-02 09:49:11,377 | DEBUG | 'Sepsis' -> ['Sepsis', 'other-args']
2024-03-02 09:49:11,377 | INFO | Writing to CSV
2024-03-02 09:49:11,378 | INFO | Done CSV
Notes
run_mappers()
codeother_args
look like, so I fake itThreadPoolExecutor
with ProcessPoolExecutor
and compare the timing to see which solution works more efficently