daskdask-distributedfsspec

How to use user-defined fsspec filesystem with dask?


I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. My guess is that the dask workers don't have a good copy of the filesystem. Here is some test code

from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation

class MyLocalFileSystem(AbstractFileSystem):
    def __init__(self,*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.rawfs=LocalFileSystem(*args,**kwargs)

    def __wrap(method_name):
        return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
    
    _open = __wrap("_open")
    info = __wrap("info")
    ls = __wrap("ls")

    del __wrap
    
register_implementation("mfs",MyLocalFileSystem,clobber=True)

import dask.dataframe as dd
import pandas as pd

from tempfile import NamedTemporaryFile
from dask.distributed import Client

with NamedTemporaryFile(mode='wt') as f, Client(): #works if I remove `, Client() (and no other client is running)`
    f.write("A\n0")
    f.flush()
    print("pd, localfs",pd.read_csv(f.name).size)
    print("dd, localfs",dd.read_csv(f.name).size.compute())
    print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
    # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
    

and I get an error

2024-06-08 14:48:33,328 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-232fa012bb421939d7011c3af11ac4a7-ea46e61534d2be2ea62e2fe234f0d607', 0)
Function:  execute_task
args:      ((subgraph_callable-92b603c9f28a44f7e623972919b6934a, [(<function read_block_from_file at 0x75336666f420>, <OpenFile '/tmp/tmpob6dxgol'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[15], line 33
     31 print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
     32 # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
---> 33 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[15], line 10, in <lambda>()
      9 def __wrap(method_name):
---> 10     return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

Is this supposed to work? How can I get this working?

Versions: python: 3.12.3 (main, Apr 23 2024, 09:16:07) [GCC 13.2.1 20240417] dask: 2024.4.1 fsspec: 2024.3.1

More attempts

After some suggestions by mdurant, I tried some new stuff and printed out more stuff. He suggested passing in rawfs, which I tried to do with the following code:

from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client

def setup():
    class MyLocalFileSystem(AbstractFileSystem):
        def __init__(self,*args,rawfs=None,**kwargs):
            super().__init__(*args,**kwargs)
            print(rawfs or "no rawfs")
            self.rawfs=rawfs or LocalFileSystem(*args,**kwargs)
    
        def __wrap(method_name):
            def wrapped(self,*args,**kwargs):
                print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)

                return getattr(self.rawfs,method_name)(*args,**kwargs)
            return wrapped
        
        _open = __wrap("_open")
        info = __wrap("info")
        ls = __wrap("ls")
    
        del __wrap
        
    register_implementation("mfs",MyLocalFileSystem,clobber=True)
    print('setup run')
setup()

with NamedTemporaryFile(mode='wt') as f, Client() as client:
    print(client)
    f.write("A\n0")
    f.flush()
    # client.run(setup)
    # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
    

And here I get the output

setup run
<Client: 'tcp://127.0.0.1:39165' processes=7 threads=28, memory=31.11 GiB>
<fsspec.implementations.local.LocalFileSystem object at 0x7643560ddb20>
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x779ee00c37a0>
has rawfs? False <__main__.MyLocalFileSystem object at 0x779ee00c3770> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}

2024-06-10 21:08:21,826 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-2f0130c30a88c68f95b5bfe3e88628c5-217eb1639a5f8a0a111f4ea3e7068b71', 0)
Function:  execute_task
args:      ((subgraph_callable-718c799e1dd760f63b6c2cb4a7729284, [(<function read_block_from_file at 0x779ee01bf420>, <OpenFile '/tmp/tmpgb0u08ud'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[3], line 37
     34 f.flush()
     35 # client.run(setup)
     36 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
---> 37 print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[3], line 18, in wrapped()
     16 def wrapped(self,*args,**kwargs):
     17     print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18     return getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

and next I tried

client.run(setup)
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())

instead of

print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute())

and I get

setup run
<Client: 'tcp://127.0.0.1:39955' processes=7 threads=28, memory=31.11 GiB>
no rawfs
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
no rawfs
no rawfs
setup run
no rawfs
has rawfs? False <__main__.MyLocalFileSystem object at 0x7381fb4535f0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
setup run
setup run
setup run
setup run
setup run
setup run

2024-06-10 21:09:01,153 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-d64b9d4bac0b21c55395551178057d16-dcfe0b01af30226d1945c64ee5b65773', 0)
Function:  execute_task
args:      ((subgraph_callable-c34921dd377ac1fd9d899f4136f2f0c6, [(<function read_block_from_file at 0x7381fb545d00>, <OpenFile '/tmp/tmpjqcw7153'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], line 36
     34 f.flush()
     35 client.run(setup)
---> 36 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
     37 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[4], line 18, in wrapped()
     16 def wrapped(self,*args,**kwargs):
     17     print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18     return getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

I probably didn't do what mdurant intended me to do, and it seems my FileSystem doesn't get its property set after the third instance. Am I doing it wrong?

Third try

Ok, based on a comment from mdurant I am going to try to put the code into a file and send it to workers. Here is the code I tried:

import dask.dataframe as dd, importlib, re
from tempfile import NamedTemporaryFile
from dask.distributed import Client

startup_code="""
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation

class MyLocalFileSystem(AbstractFileSystem):
    def __init__(self,*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.rawfs=LocalFileSystem(*args,**kwargs)
        
    def __wrap(method_name):
        return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)

    _open = __wrap("_open")
    info = __wrap("info")
    ls = __wrap("ls")

    del __wrap
    
register_implementation("mfs",MyLocalFileSystem,clobber=True)
"""

with NamedTemporaryFile(mode='wt',suffix=".py") as startup_code_file, Client() as client, NamedTemporaryFile(mode='wt') as f:
    startup_code_file.write(startup_code)
    startup_code_file.flush()
    client.upload_file(startup_code_file.name)
    importlib.import_module(re.search(r'/([^/]*)\.py',startup_code_file.name).group(1))
    f.write("A\n0")
    f.flush()
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())

and this works. I get the output dd, myfs 1.


Solution

  • It is unusual... I would have thought that your __init__() method does get called on the workers while deserialising the OpenFile object. However, a more typical approach than setting an attribute and hoping it appears on the remote object, is to include the object as an argument:

        class MyLocalFileSystem(AbstractFileSystem):
            def __init__(self,*args, fs=None, **kwargs):
                super().__init__(*args, **kwargs)
                self.rawfs = fs
    

    I think this more likely to work

    Also, you may consider calling register_implementation on the workers (via client.run()), but I don't think this should be necessary for this workflow.

    In general, it is best to keep your class definitions in files rather than dynamic; and then you can distribute those files to workers either by having a shared filesystem, or using client.upload_file.