pythonpandasmultithreading

I can not synchronize lists using multitasking in python


Need some help from this forum, seems to be a simple question but I do not configure out. I want to use multithreading in my email validation & resulting output in an excel sheet(in row header email name---deliverable----host exit, say: example@test.com----false---true). But result output did not display in the proper way ie the wrong result was displayed in the corresponding email. I think when appending results in the list it not indexing the proper way as I use multithreading so those who complete tasks first also indexing first. So three lists(emails_list, deliverable_list, hostexists_list) are not synchronized with the same indexing. Any solution will be appreciated. Bellow is the code I used.

pool_sema = threading.Semaphore(value=maxthreads)

# make empty list
fields = []
emails_list = []
deliverable_list = []
hostexists_list = []

csvFilename = r'test2.csv'

with open(csvFilename, 'r', encoding="utf-8") as csvFile:
    csvreader = csv.reader(csvFile)
    # extracting field names through first row
    fields = next(csvreader)  # skip the header
    for row in csvreader:
        x,y = [str(value)for value in row] 
        inputAddress = str(y)
        emails_list.append(inputAddress)

def task(i):
    pool_sema.acquire()
    try:
        normal_verifier = Verifier(source_addr='example@gmail.com') 
        results = normal_verifier.verify(i)  # i will be email to be verify
        deliverable_list.append(results["deliverable"])
        hostexists_list.append(results["host_exists"])
        time.sleep(3)
    except Exception as e:
        print("Error")
    finally:
        pool_sema.release()

def create_threads(number_of_threads):
    try:
        threads = [threading.Thread(target=task, args=(i,)) for i in emails_list]
        [t.start() for t in threads]
    except Exception as e:
        print("Error")
    finally:
        [t.join() for t in threads]

create_threads((len(emails_list)))

# Create Data frame
# Making number of columns
validating_Email = pd.DataFrame(columns=['Email', 'Deliverable', 'Host exists'])
#Dictionary
data_dictionary = {'Email':emails_list, 'Deliverable':deliverable_list, 'Host exists' :hostexists_list}
# Dictionary to Pandas DataFrame
df = pd.DataFrame.from_dict(data_dictionary)
# Save Data in EXCEL
df.to_excel('gmail_multi_thread_1.xlsx', index = False) 

Solution

  • Threads don't do their work in the order you create them. The Nth email may be the Mth thread to complete, and so the list append will happen in the wrong order. You could solve the problem by prefilling the output lists with None and then passing in the index to the email list, not the email address itself.

    But there are other things you can do to clean up this code. You create 1 thread per email address but limit the number of running threads with a semaphore. Instead, use a thread pool with the wanted thread count. And instead of intermediate lists, push the CSV data directly into the thread pool and pull the results into the dataframe as it's created.

    import multiprocessing.pool
    
    maxthreads = 20 
    csvFilename = r'test2.csv'
    
    def task(email_addr):
        try:
            normal_verifier = Verifier(source_addr='example@gmail.com') 
            results = normal_verifier.verify(email_addr)  # i will be email to be verify
            return email_addr, results["deliverable"], results["host_exists"]
        except Exception as e:
            print(f"Error {e} for {email_addr}")
            return email_addr, "", ""
    
    with multiprocessing.pool.ThreadPool(cpucount=maxthreads) as pool:
        with open(csvFilename, newline="", encoding="utf-8") as csvFile:
            email_reader = csv.reader(csvFile)
            fields = next(csvreader)  # skip the header
            df = pd.DataFrame(
                pool.map(task, (row[1] for row in email_reader)),
                columns=['Email', 'Deliverable', 'Host exists'])
    
    # Save Data in EXCEL
    df.to_excel('gmail_multi_thread_1.xlsx', index = False)