Is there a way to sort data and drop duplicates using pure pyarrow tables? My goal is to retrieve the latest version of each ID based on the maximum update timestamp.
Some extra details: my datasets are normally structured into at least two versions:
The historical dataset would include all updated items from a source so it is possible to have duplicates for a single ID for each change that happened to it (picture a Zendesk or ServiceNow ticket, for example, where a ticket can be updated many times)
I then read the historical dataset using filters, convert it into a pandas DF, sort the data, and then drop duplicates on some unique constraint columns.
dataset = ds.dataset(history, filesystem, partitioning)
table = dataset.to_table(filter=filter_expression, columns=columns)
df = table.to_pandas().sort_values(sort_columns, ascending=True).drop_duplicates(unique_constraint, keep="last")
table = pa.Table.from_pandas(df=df, schema=table.schema, preserve_index=False)
# ds.write_dataset(final, filesystem, partitioning)
# I tend to write the final dataset using the legacy dataset so I can make use of the partition_filename_cb - that way I can have one file per date_id. Our visualization tool connects to these files directly
# container/dataset/date_id=20210127/20210127.parquet
pq.write_to_dataset(final, filesystem, partition_cols=["date_id"], use_legacy_dataset=True, partition_filename_cb=lambda x: str(x[-1]).split(".")[0] + ".parquet")
It would be nice to cut out that conversion to pandas and then back to a table, if possible.
Edit March 2022: PyArrow is adding more functionalities, though this one isn't here yet. My approach now would be:
def drop_duplicates(table: pa.Table, column_name: str) -> pa.Table:
unique_values = pc.unique(table[column_name])
unique_indices = [pc.index(table[column_name], value).as_py() for value in unique_values]
mask = np.full((len(table)), False)
mask[unique_indices] = True
return table.filter(mask=mask)
//end edit
I saw your question because I had a similar one, and I solved it for my work (due to IP issues I can't post the whole code but I'll try to answer as well as I can. I've never done this before)
import pyarrow.compute as pc
import pyarrow as pa
import numpy as np
array = table.column(column_name)
dicts = {dct['values']: dct['counts'] for dct in pc.value_counts(array).to_pylist()}
for key, value in dicts.items():
# do stuff
I used the 'value_counts' to find the unique values and how many of them there are (https://arrow.apache.org/docs/python/generated/pyarrow.compute.value_counts.html). Then I iterated over those values. If the value was 1, I selected the row by using
mask = pa.array(np.array(array) == key)
row = table.filter(mask)
and if the count was more then 1 I selected either the first or last one by using numpy boolean arrays as a mask again.
After iterating it was just as simple as pa.concat_tables(tables)
warning: this is a slow process. If you need something quick&dirty, try the "Unique" option (also in the same link I provided).
edit/extra:: you can make it a bit faster/less memory intensive by keeping up a numpy array of boolean masks while iterating over the dictionary. then in the end you return a "table.filter(mask=boolean_mask)". I don't know how to calculate the speed though...
edit2: (sorry for the many edits. I've been doing a lot of refactoring and trying to get it to work faster.)
You can also try something like:
def drop_duplicates(table: pa.Table, col_name: str) ->pa.Table:
column_array = table.column(col_name)
mask_x = np.full((table.shape[0]), False)
_, mask_indices = np.unique(np.array(column_array), return_index=True)
mask_x[mask_indices] = True
return table.filter(mask=mask_x)