I am trying to use ThreadPoolExecutor with 8 workers on a csv file data. When I read the csv file using csv.DictReader, I get a dictionaries with header as a key and each row having the value.
Now when I run the ThreadPoolExecutor, all 8 workers start using the data in the first row which is not ideal. I want each worker(thread) to first import csv file, randomize the row data and then go through all of them using for loop.
def grab_data(i):
with open('devices.csv', 'r') as csv_file:
csv_reader = csv.DictReader(csv_file)
# Need something here to randomize the print
print(*csv_reader)
with ThreadPoolExecutor(max_workers=5, thread_name_prefix="API") as executor:
executor.map(grab_data, range(1, 15))
Current output will be same for all worker:
{'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
{'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}
{'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
{'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}
For Second Worker
{'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
{'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}
{'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
{'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}
I want for second worker to randomize the lines with output looking like this:
{'A': 'MVU', 'B': 'RQ', 'C': 'IG', 'D': 'XD', 'E': 'AO'}
{'A': 'FLR', 'B': 'E', 'C': 'SYM', 'D': 'DY', 'E': 'OE'}
{'A': 'XX', 'B': 'GWP', 'C': 'EA', 'D': 'FF', 'E': 'BW'}
{'A': 'NZZ', 'B': 'QBS', 'C': 'CT', 'D': 'BI', 'E': 'JG'}
Now from here, I will use this data in a for loop:
for line in csv_reader:
r = requests.get(
url=f"https://192.168.1.56:1813/rest/{line['A']}/sh/1/sl/{line['B']}/{line['C']}/status?",
verify=False,
auth=('random', 'TestTest'),
)
So that all workers are working on different data at a time, and not on same data.
am somewhat confused as the end goal is not specified/clear to me. ? Why not read the csv into the main thread, generate a randomised list for each worker, then let them iterate over that list to access the specific record in turn ? , relatively trivial (ASSUMING THEY ARE ONLY READING) - example below FOR DEMONSTRATION PURPOSES only !
(using pandas or duckdb likewise trivial)
$ cat swarm.py
import sys
import csv
import random
from concurrent.futures import ThreadPoolExecutor
def load_csv(filepath):
with open(filepath, newline='') as f:
reader = csv.DictReader(f)
return list(reader) # list of dicts
data = load_csv(sys.argv[1])
numRows = len(data)
nDrones = 4
if len(sys.argv) == 3:
nDrones = int(sys.argv[2])
# Worker function
def Worker(rowsToProcess, droneId):
rows=list(range(rowsToProcess))
random.shuffle(rows)
for idx in rows:
row = data[idx]
print(f"[Worker {droneId}] Record {idx}: {row}")
return f"Worker {droneId} finished"
# Run threads
with ThreadPoolExecutor(max_workers=nDrones) as theHive:
bees = []
for i in range(nDrones):
bees.append(theHive.submit(Worker, numRows, i))
for bee in bees:
print(bee.result())
#
# run it - with 5 threads
#
$ python swarm.py test.csv 5
[Worker 0] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
[Worker 0] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
[Worker 0] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
[Worker 0] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
[Worker 1] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
[Worker 1] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
[Worker 2] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
[Worker 1] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
[Worker 3] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
[Worker 2] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
[Worker 4] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
[Worker 3] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
[Worker 2] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
Worker 0 finished
[Worker 1] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
[Worker 2] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
[Worker 4] Record 2: {'AsOfDate': "'04/02/2024'", 'RECNO': 'ROW3', 'Value': '-2250'}
Worker 1 finished
[Worker 3] Record 3: {'AsOfDate': "'22/11/2024'", 'RECNO': 'ROW1', 'Value': '-2500'}
Worker 2 finished
[Worker 4] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
[Worker 3] Record 1: {'AsOfDate': "'04/01/2024'", 'RECNO': 'ROW2', 'Value': '-3000'}
[Worker 4] Record 0: {'AsOfDate': "'05/11/2024'", 'RECNO': 'ROW1', 'Value': '-4000'}
Worker 3 finished
Worker 4 finished