I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. My guess is that the dask workers don't have a good copy of the filesystem. Here is some test code
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.rawfs=LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client
with NamedTemporaryFile(mode='wt') as f, Client(): #works if I remove `, Client() (and no other client is running)`
f.write("A\n0")
f.flush()
print("pd, localfs",pd.read_csv(f.name).size)
print("dd, localfs",dd.read_csv(f.name).size.compute())
print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
# print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
and I get an error
2024-06-08 14:48:33,328 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-232fa012bb421939d7011c3af11ac4a7-ea46e61534d2be2ea62e2fe234f0d607', 0)
Function: execute_task
args: ((subgraph_callable-92b603c9f28a44f7e623972919b6934a, [(<function read_block_from_file at 0x75336666f420>, <OpenFile '/tmp/tmpob6dxgol'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[15], line 33
31 print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
32 # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
---> 33 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[15], line 10, in <lambda>()
9 def __wrap(method_name):
---> 10 return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
Is this supposed to work? How can I get this working?
Versions: python: 3.12.3 (main, Apr 23 2024, 09:16:07) [GCC 13.2.1 20240417] dask: 2024.4.1 fsspec: 2024.3.1
After some suggestions by mdurant, I tried some new stuff and printed out more stuff. He suggested passing in rawfs, which I tried to do with the following code:
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client
def setup():
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,rawfs=None,**kwargs):
super().__init__(*args,**kwargs)
print(rawfs or "no rawfs")
self.rawfs=rawfs or LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
def wrapped(self,*args,**kwargs):
print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
return getattr(self.rawfs,method_name)(*args,**kwargs)
return wrapped
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
print('setup run')
setup()
with NamedTemporaryFile(mode='wt') as f, Client() as client:
print(client)
f.write("A\n0")
f.flush()
# client.run(setup)
# print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
And here I get the output
setup run
<Client: 'tcp://127.0.0.1:39165' processes=7 threads=28, memory=31.11 GiB>
<fsspec.implementations.local.LocalFileSystem object at 0x7643560ddb20>
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x779ee00c37a0>
has rawfs? False <__main__.MyLocalFileSystem object at 0x779ee00c3770> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
2024-06-10 21:08:21,826 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-2f0130c30a88c68f95b5bfe3e88628c5-217eb1639a5f8a0a111f4ea3e7068b71', 0)
Function: execute_task
args: ((subgraph_callable-718c799e1dd760f63b6c2cb4a7729284, [(<function read_block_from_file at 0x779ee01bf420>, <OpenFile '/tmp/tmpgb0u08ud'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[3], line 37
34 f.flush()
35 # client.run(setup)
36 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
---> 37 print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[3], line 18, in wrapped()
16 def wrapped(self,*args,**kwargs):
17 print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18 return getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
and next I tried
client.run(setup)
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())
instead of
print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute())
and I get
setup run
<Client: 'tcp://127.0.0.1:39955' processes=7 threads=28, memory=31.11 GiB>
no rawfs
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
no rawfs
no rawfs
setup run
no rawfs
has rawfs? False <__main__.MyLocalFileSystem object at 0x7381fb4535f0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
setup run
setup run
setup run
setup run
setup run
setup run
2024-06-10 21:09:01,153 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-d64b9d4bac0b21c55395551178057d16-dcfe0b01af30226d1945c64ee5b65773', 0)
Function: execute_task
args: ((subgraph_callable-c34921dd377ac1fd9d899f4136f2f0c6, [(<function read_block_from_file at 0x7381fb545d00>, <OpenFile '/tmp/tmpjqcw7153'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[4], line 36
34 f.flush()
35 client.run(setup)
---> 36 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
37 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[4], line 18, in wrapped()
16 def wrapped(self,*args,**kwargs):
17 print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18 return getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
I probably didn't do what mdurant intended me to do, and it seems my FileSystem doesn't get its property set after the third instance. Am I doing it wrong?
Ok, based on a comment from mdurant I am going to try to put the code into a file and send it to workers. Here is the code I tried:
import dask.dataframe as dd, importlib, re
from tempfile import NamedTemporaryFile
from dask.distributed import Client
startup_code="""
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.rawfs=LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
"""
with NamedTemporaryFile(mode='wt',suffix=".py") as startup_code_file, Client() as client, NamedTemporaryFile(mode='wt') as f:
startup_code_file.write(startup_code)
startup_code_file.flush()
client.upload_file(startup_code_file.name)
importlib.import_module(re.search(r'/([^/]*)\.py',startup_code_file.name).group(1))
f.write("A\n0")
f.flush()
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())
and this works. I get the output dd, myfs 1
.
It is unusual... I would have thought that your __init__()
method does get called on the workers while deserialising the OpenFile object.
However, a more typical approach than setting an attribute and hoping it appears on the remote object, is to include the object as an argument:
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args, fs=None, **kwargs):
super().__init__(*args, **kwargs)
self.rawfs = fs
I think this more likely to work
Also, you may consider calling register_implementation
on the workers (via client.run()
), but I don't think this should be necessary for this workflow.
In general, it is best to keep your class definitions in files rather than dynamic; and then you can distribute those files to workers either by having a shared filesystem, or using client.upload_file
.