i have server ip:192.168.33.10 launche the schudeler dask scheduler --host 0.0.0.0
this is master in this server i have file "/var/shared/job_skills.csv" and the workers is
192.168.33.11,192.168.33.12 launched with this cmd dask worker 192.168.33.10:8786 --local-directory /var/test --dashboard-address 8787 --host 0.0.0.0 --worker-port 39040 --nanny-port 39042
i wanna start script read_csv.py in master and distribute the task to worker , like chunck the data and do aggregation and every worker return result and print result .
i wannna do like this this scripte in master "read_csv.py"
import dask
import dask.dataframe as dd
from dask.distributed import Client
dask.config.set({"dataframe.convert-string": False})
client = Client("192.168.33.10:8786")
df = dd.read_csv("/var/shared/foo.csv")
df['job_skills'] = df['job_skills'].fillna('')
df = df["job_skills"].str.split(',').explode().str.strip()
grouped = df.value_counts().compute()
print(grouped)
in the workers give me like this :
2024-02-29 14:30:04,180 - distributed.worker - WARNING - Compute Failed
Key: ('str-strip-956897fad2adeffa85aa604734f0febb', 0)
Function: execute_task
args: ((subgraph_callable-50304a7f18bfe19fd3ff56b4a6d6db4f, 'str', 'strip', (), {}, 'explode-e2622e44a85e1396024ff799e4f97b6e', 'split', {'pat': ',', 'n': -1, 'expand': False}, 'getitem-f2b8974c7433cce59ce3453d7f35940e', 'job_skills', '', 'getitem-b9a23a03a236420b7c31ada8ec6055df', [(<function read_block_from_file at 0x7fd18829f920>, <OpenFile '/var/shared/foo.csv'>, 0, 1398, b'\n'), None, True, True]))
kwargs: {}
Exception: "FileNotFoundError(2, 'No such file or directory')"
how i can resolve this ?
One way to solve this without copying the file to workers.
df = dd.read_csv("ssh://server:port//var/shared/foo.csv")
Now each workers will read directly from source, and only those bytes it will be working on.
Possible alternatives for others: