pythoncsvdictionaryrandom

How do I shuffle randomly the output of csv,DictReader so that I am able to operations on each values using ThreadPoolExecutor


I am trying to use ThreadPoolExecutor with 8 workers on a csv file data. When I read the csv file using csv.DictReader, I get a dictionaries with header as a key and each row having the value.

Now when I run the ThreadPoolExecutor, all 8 workers start using the data in the first row which is not ideal. I want each worker(thread) to first import csv file, randomize the row data and then go through all of them using for loop.

def grab_data(i):
    with open('devices.csv', 'r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
# Need something here to randomize the print        
         print(*csv_reader)

with ThreadPoolExecutor(max_workers=5, thread_name_prefix="API") as executor:
    executor.map(grab_data, range(1, 15))

Current output will be same for all worker:

    {'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
    {'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}
    {'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
    {'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}

For Second Worker
    {'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
    {'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}
    {'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
    {'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}

I want for second worker to randomize the lines with output looking like this:

{'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
{'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
{'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}
{'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}

Now from here, I will use this data in a for loop:

   for line in csv_reader:
        r = requests.get(
            url=f"https://192.168.1.56:1813/rest/{line['A']}/sh/1/sl/{line['B']}/{line['C']}/status?",
            verify=False,
            auth=('random', 'TestTest'),
        )

So that all workers are working on different data at a time, and not on same data.


Solution

  • am somewhat confused as the end goal is not specified/clear to me. ? Why not read the csv into the main thread, generate a randomised list for each worker, then let them iterate over that list to access the specific record in turn ? , relatively trivial (ASSUMING THEY ARE ONLY READING) - example below FOR DEMONSTRATION PURPOSES only !

    (using pandas or duckdb likewise trivial)

    $ cat swarm.py 
    import sys
    import csv
    import random
    from concurrent.futures import ThreadPoolExecutor
    
    def load_csv(filepath):
        with open(filepath, newline='') as f:
            reader = csv.DictReader(f)
            return list(reader)  # list of dicts
    
    data = load_csv(sys.argv[1])
    numRows = len(data)
    
    nDrones = 4
    if len(sys.argv) == 3:
        nDrones = int(sys.argv[2])
    
    # Worker function
    def Worker(rowsToProcess, droneId):
        rows=list(range(rowsToProcess))
        random.shuffle(rows)
        for idx in rows:
            row = data[idx]
            print(f"[Worker {droneId}] Record {idx}: {row}")
        return f"Worker {droneId} finished"
    
    # Run threads
    with ThreadPoolExecutor(max_workers=nDrones) as theHive:
        bees = []
        for i in range(nDrones):
            bees.append(theHive.submit(Worker, numRows, i))
        for bee in bees:
            print(bee.result())
    
    #
    # run it - with 5 threads 
    # 
    $ python swarm.py test.csv 5
    [Worker 0] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
    [Worker 0] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
    [Worker 0] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
    [Worker 0] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
    [Worker 1] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
    [Worker 1] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
    [Worker 2] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
    [Worker 1] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
    [Worker 3] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
    [Worker 2] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
    [Worker 4] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
    [Worker 3] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
    [Worker 2] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
    Worker 0 finished
    [Worker 1] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
    [Worker 2] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
    [Worker 4] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
    Worker 1 finished
    [Worker 3] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
    Worker 2 finished
    [Worker 4] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
    [Worker 3] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
    [Worker 4] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
    Worker 3 finished
    Worker 4 finished