pythonpyarrowapache-arrow

How do I convert a csv file to an apache arrow ipc file with dictionary encoding


I am trying to use pyarrow to convert a csv to an apache arrow ipc with dictionary encoding turned on. The following appears to convert the csv to an arrow ipc file:

file = "./in.csv"
arrowFile = "./out.arrow"
with pa.OSFile(arrowFile, 'wb') as arrow:
        with pa.csv.open_csv(file) as reader:
            with pa.RecordBatchFileWriter(arrow, reader.schema) as writer:
                for batch in reader:
                    writer.write_batch(batch)

I tried the following to use dictionary encoding:

convert_options = pa.csv.ConvertOptions(auto_dict_encode = True)
    with pa.OSFile(arrowFile, 'wb') as arrow:
        with pa.csv.open_csv(file, convert_options=convert_options) as reader:
            with pa.RecordBatchFileWriter(arrow, reader.schema) as writer:
                for batch in reader:
                    writer.write_batch(batch)

But I get the following error:

File "pyarrow/ipc.pxi", line 507, in pyarrow.lib._CRecordBatchWriter.write_batch
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single non-delta dictionary for a given field across all batches.

How do I fix the code to use dictionary encoding?


Solution

  • It looks like the IPC protocol only supports unified dictionary encoding.

    In your example each batch has got different dictionary encoding, which IPC doesn't support.

    You'd have to load the whole table and call unify_dicionaries.

    import pyarrow as pa
    
    
    schema = pa.schema([pa.field("col1", pa.dictionary(pa.int32(), pa.string()))])
    
    table = pa.Table.from_batches(
        [
            pa.record_batch({"col1": ["a", "b", "b"]}, schema),
            pa.record_batch({"col1": ["b", "b", "c"]}, schema),
        ],
    )
    
    
    with pa.OSFile("data.arrow", "wb") as arrow:
        with pa.RecordBatchFileWriter(arrow, schema) as writer:
            for batch in table.unify_dictionaries().to_batches():
                writer.write_batch(batch)