I would like to to first write a stream into an arrow file and then later read it back into a pandas dataframe, with as little memory overhead as posible.
Writing data in batches works perfectly fine:
import pyarrow as pa
import pandas as pd
import random
data = [pa.array([random.randint(0, 1000)]), pa.array(['B']), pa.array(['C'])]
columns = ['A','B','C']
batch = pa.RecordBatch.from_arrays(data, columns)
with pa.OSFile('test.arrow', 'wb') as f:
with pa.RecordBatchStreamWriter(f, batch.schema) as writer:
for i in range(1000 * 1000):
data = [pa.array([random.randint(0, 1000)]), pa.array(['B']), pa.array(['C'])]
batch = pa.RecordBatch.from_arrays(data, columns)
writer.write_batch(batch)
Writing 1 million rows as above is fast and uses about 40MB memory during the entire write. This is perfectly fine.
However reading back is not fine since the memory consumption goes up to 2GB, before producing the final dataframe which is about 118MB.
I tried this:
with pa.input_stream('test.arrow') as f:
reader = pa.BufferReader(f.read())
table = pa.ipc.open_stream(reader).read_all()
df1 = table.to_pandas(split_blocks=True, self_destruct=True)
and this, with the same memory overhead:
with open('test.arrow', 'rb') as f:
df1 = pa.ipc.open_stream(f).read_pandas()
Dataframe size:
print(df1.info(memory_usage='deep'))
Data columns (total 3 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 A 1000000 non-null int64
1 B 1000000 non-null object
2 C 1000000 non-null object
dtypes: int64(1), object(2)
memory usage: 118.3 MB
None
What I would need is either to fix the memory usage with pyarrow or a suggestion which other format I could use to write data into incrementally, then read all of it into a pandas dataframe and without too much memory overhead.
Your example is using many RecordBatches each of a single row. Such a RecordBatch has some overhead in addition to just the data (the schema, potential padding/alignment), and thus is not efficient for storing only a single row.
When reading the file with read_all()
or read_pandas()
, it first creates all those RecordBatches before converting them to a single Table. And then the overhead adds up, and this is what you are seeing.
The recommended size for a RecordBatch is of course dependent on the exact use case, but a typical size is 64k to 1M rows.
To see the effect of the padding to 64 bytes per array (https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding), let's check the total allocated bytes vs the actual bytes represented by the RecordBatch:
import pyarrow as pa
batch = pa.RecordBatch.from_arrays(
[pa.array([1]), pa.array(['B']), pa.array(['C'])],
['A','B','C']
)
# The size of the data stored in the RecordBatch
# 8 for the integer (int64), 9 for each string array (8 for the len-2 offset array (int32), 1 for the single string byte)
>>> batch.nbytes
26
# The size of the data actually being allocated by Arrow
# (5*64 for 5 buffers padded to 64 bytes)
>>> pa.total_allocated_bytes()
320
So you can see that just this padding already gives a big overhead for a tiny RecordBatch