pythonmultiprocessingshared-memorypyarrowmemoryview

BufferError: memoryview has 1 exported buffer trying to close a shared memory where I put a dataframe of pyarrow data type values


I'm trying to put dataframes of different types in multiprocessing shared memory. When I'm using dataframes that contain python types I've no problem, but when I use pyarrow types I've problems closing the Shared Memories. I've a function to put a dataframe to the Shared Memory and another to receive this dataframe. I have two problems when I'm using Arrow types like string[pyarrow] inside a dataframe:

My functions to put the dataframe to the shared memory and to receive the dataframe from shared memory are these:

> def put_df(data, logTime, logSize, usingArrow=False):
>     if(usingArrow):
>         table = (pyarrow.Table.from_pandas(data)).combine_chunks()
>         record_batch = table.to_batches(max_chunksize = sys.maxsize)[0]
>     else:
>         record_batch = pa.RecordBatch.from_pandas(data)  
>     # Determine the size of buffer to request from the shared memory
>     mock_sink = pa.MockOutputStream()
>     stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
>     stream_writer.write_batch(record_batch)
>     stream_writer.close()
>     data_size = mock_sink.size()
>     
>     sm_put = SharedMemory(create=True, size=data_size)
>     buffer = pa.py_buffer(sm_put.buf)
> 
>     # Write the PyArrow RecordBatch to SharedMemory
>     stream = pa.FixedSizeBufferWriter(buffer)
>     stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
>     stream_writer.write_batch(record_batch)
>     stream_writer.close()
> 
>     del stream_writer
>     del stream
>     del buffer
>     if(usingArrow):
>         del table
>     del record_batch
> 
>     sm_put.close()
>     return sm_put.name 
> 
> def get_df(sm_get_name, logTime, logSize, usingArrow=False):
>     sm_get = SharedMemory(name = sm_get_name, create = False)
>     buffer = pa.BufferReader(sm_get.buf)
> 
>     # Convert object back into an Arrow RecordBatch
>     reader = pa.RecordBatchStreamReader(buffer)
>     record_batch = reader.read_next_batch()
> 
>     # Convert back into Pandas
>     if(usingArrow):
>         data = record_batch.to_pandas(types_mapper = pd.ArrowDtype)
>     else:
>         data = record_batch.to_pandas()
>            
>     del buffer
>     del reader
>     del record_batch
> 
>     sm_get.close()
>     sm_get.unlink()
> 
>     return data
> 
> if __name__ == '__main__':
>     names = [ "0", "1"]
>     types = { "0": "string[pyarrow]", "1": "string[pyarrow]" }
>     df = pd.read_csv(open(file, "r"), index_col=False, header=None, names=names, dtype=types, engine="c")
>     sm_get_name = put_df(df)
>     data = get_df(sm_get_name=sm_get_name)

I've seen that when I'm trying to del the pyarrow objects (like table or record_batch) the memory is not released instantly (I'm checking It with the memory_profiler library). This could be the problem, someone know If that can be the problem and how can I solve It?

I'm using pyarrow 11.0.0 and pandas 1.5.3

The errors are these: Error appeared when I do sm_put.close() and sm_get.close() Error appeared when I only do the sm_put.close() or I don't call any close() method

The profile of memory when I'm using pd.ArrowDtype inside dataframes makes me think about memory leaks that can cause the BufferError:

Profile of memory using dataframes with pd.ArrowDtpe


Solution

  • I'm not able to completely reproduce your example. My understanding is that put_df is doing the right thing. But in get_df, the record_batch is holding to a view of the buffer / shared memory (in an effort to avoid copying data).

    It means that you shouldn't hold on to the record batch (and the reader) before you close the shared memory.

    I got it to work, but it's not very different from your solution. All I did is make sure the RecordBatchStreamReader is closed.

    def get_df(sm_get_name):
        sm_get = SharedMemory(name = sm_get_name, create = False)
        buffer = pa.BufferReader(sm_get.buf)
    
        with pa.RecordBatchStreamReader(buffer) as reader:
            record_batch = reader.read_next_batch()
            df = record_batch.to_pandas()
            del record_batch
        del reader
        del buffer
    
        sm_get.close()
        sm_get.unlink()
    
        return df
    

    It's also possible that using pd.ArrowDtype doesn't work because it means the shared memory data isn't copied at all. Whereas the standard to_pandas implementation copies the data from arrow to numpy format.

    PS:

    the memory is not released instantly (I'm checking It with the memory_profiler library)

    python doesn't immediately release the memory that has been freed to the OS. So don't expect the process memory to decrease. Instead you should jsut make sure that the memory doesn't increase if you call this function several times.