I have two dataframes called df
and ranges
:
data = {
'group': ['A', 'B', 'A', 'C', 'B'],
'start': [10, 20, 15, 30, 25],
'end': [50, 40, 60, 70, 45],
'val1': [5, 10, 11, 12, 6],
'val2': [5, 2, 1, 1, 0],
}
df = pd.DataFrame(data)
data = {
'group': ['A', 'B', 'C'],
'start': [0, 5, 25],
'end': [50, 7, 35],
}
ranges = pd.DataFrame(data)
My goal is to aggregate the rows in df
together based on whether they fall within the same range defined in ranges
. I would like to aggregate them together such that for each val1, val2
column I get the min, max, mean, sum
of that column within the context of the aggregation group.
The catch here is that I need to do this for something like 5000 ranges in ranges
and 500,000 rows in df
. So I'd like a fast but memory efficient (relatively) solution. I'm open to solutions using similar frameworks such as vaex
.
Expected output where range_id
is just a way to identify groups assuming they're not unique:
range_id val1 val2
min max mean sum min max mean sum
0 0 5 5 5.0 5 5 5 5.0 5
IIUC, I would pre-filter the dataframe with map
and boolean indexing, then perform a classical groupby.agg
. This should keep the masks and intermediate (filtered) DataFrame minimal for memory efficiency, and minimize the size of the input for groupby
.
# columns to aggregate
cols = ['val1', 'val2']
# ensure data is numeric
df[cols] = df[cols].astype(int)
# optional, just to avoid having to `set_index` twice
tmp = ranges.set_index('group')
# pre-filter the rows for memory efficiency
# then perform a groupby.agg
out = (df[( df['start'].ge(df['group'].map(tmp['start']))
&df['end'].le(df['group'].map(tmp['end'])))]
.groupby('group', as_index=False)[cols].agg(['min', 'max', 'mean', 'sum'])
)
Output:
group val1 val2
min max mean sum min max mean sum
0 A 5 5 5.0 5 5 5 5.0 5
Intermediate before the groupby
:
group start end val1 val2
0 A 10 50 5 5
@sammywemmy proposed an variation of my solution. Instead of computing all aggregations simultaneously in groupby.agg
, you could compute them individually and combine them with concat
. This is faster and potentially a bit more efficient memory-wise.
from itertools import product
cols = ['val1', 'val2']
tmp = ranges.set_index('group')
grouped = (df[( df['start'].ge(df['group'].map(tmp['start']))
&df['end'].le(df['group'].map(tmp['end'])))]
).groupby('group')
aggs = ['min','mean','max','sum']
bunch = product(cols, aggs)
contents = []
for col, _agg in bunch:
outcome = grouped[col].agg(_agg)
outcome.name = (col,_agg)
contents.append(outcome)
out = pd.concat(contents,axis=1)
example generating function
def init(N):
df = pd.DataFrame({'group': np.random.randint(0, 20, N),
'start': np.random.randint(0, 100, N),
'end': np.random.randint(0, 100, N),
'val1': np.random.randint(0, 100, N),
'val2': np.random.randint(0, 100, N),
})
# ensure start <= end
df[['start', 'end']] = np.sort(df[['start', 'end']], axis=1)
group = df['group'].unique()
ranges = pd.DataFrame({'group': group,
'start': np.random.randint(0, 110, len(group)),
'end': np.random.randint(0, 110, len(group)),
})
ranges[['start', 'end']] = np.sort(ranges[['start', 'end']], axis=1)
return df, ranges
on 10M rows
# mozway_pre_filter
Line # Mem usage Increment Occurrences Line Contents
=============================================================
49 711.2 MiB 711.2 MiB 1 def mozway_pre_filter(df, ranges):
50 # columns to aggregate
51 711.2 MiB 0.0 MiB 1 cols = ['val1', 'val2']
52
53 # ensure data is numeric
54 863.9 MiB 152.6 MiB 1 df[cols] = df[cols].astype(int)
55
56 # optional, just to avoid having to `set_index` twice
57 863.9 MiB 0.0 MiB 1 tmp = ranges.set_index('group')
58
59 # pre-filter the rows for memory efficiency
60 # then perform a groupby.agg
61 950.9 MiB 11.2 MiB 4 return (df[( df['start'].ge(df['group'].map(tmp['start']))
62 881.6 MiB 9.5 MiB 1 &df['end'].le(df['group'].map(tmp['end'])))]
63 950.7 MiB -66.6 MiB 2 .groupby('group', as_index=False)[cols].agg(['min', 'max', 'mean', 'sum'])
# donkey_merge
Line # Mem usage Increment Occurrences Line Contents
=============================================================
66 884.4 MiB 884.4 MiB 1 def donkey_merge(df, ranges):
67 884.4 MiB 0.0 MiB 1 ranges = ranges.assign(range_id=ranges.index)
68 1484.4 MiB 600.1 MiB 1 df_merged = pd.merge(df, ranges, on='group')
69 1602.8 MiB 109.0 MiB 2 df_filtered = df_merged[(df_merged['start_x'] >= df_merged['start_y'])
70 1494.0 MiB 9.4 MiB 1 & (df_merged['end_x'] <= df_merged['end_y'])]
71 1602.8 MiB 0.0 MiB 2 aggregation_dict = {"val1": ['min', 'max', 'mean', 'sum'],
72 1602.8 MiB 0.0 MiB 1 "val2": ['min', 'max', 'mean', 'sum']}
73 1585.3 MiB -17.6 MiB 1 return df_filtered.groupby('range_id').agg(aggregation_dict).reset_index()
# Nayem_aggregate_range
Line # Mem usage Increment Occurrences Line Contents
=============================================================
19 905.9 MiB 905.9 MiB 1 def Nayem_aggregate_range(df, ranges):
20 905.9 MiB 0.0 MiB 1 results = []
21 961.5 MiB 0.0 MiB 21 for idx, row in ranges.iterrows():
22 961.5 MiB 0.0 MiB 20 mask = (
23 961.5 MiB 55.6 MiB 60 (df['group'] == row['group']) &
24 961.5 MiB 0.0 MiB 20 (df['start'] >= row['start']) &
25 961.5 MiB 0.0 MiB 20 (df['end'] <= row['end'])
26 )
27 961.5 MiB 0.0 MiB 20 filtered_df = df[mask]
28 961.5 MiB 0.0 MiB 20 if not filtered_df.empty:
29 961.5 MiB 0.0 MiB 20 agg_dict = {
30 961.5 MiB 0.0 MiB 20 'val1_min': filtered_df['val1'].min(),
31 961.5 MiB 0.0 MiB 20 'val1_max': filtered_df['val1'].max(),
32 961.5 MiB 0.0 MiB 20 'val1_mean': filtered_df['val1'].mean(),
33 961.5 MiB 0.0 MiB 20 'val1_sum': filtered_df['val1'].sum(),
34 961.5 MiB 0.0 MiB 20 'val2_min': filtered_df['val2'].min(),
35 961.5 MiB 0.0 MiB 20 'val2_max': filtered_df['val2'].max(),
36 961.5 MiB 0.0 MiB 20 'val2_mean': filtered_df['val2'].mean(),
37 961.5 MiB 0.0 MiB 20 'val2_sum': filtered_df['val2'].sum(),
38 }
39 961.5 MiB 0.0 MiB 20 agg_dict['range_id'] = idx
40 961.5 MiB 0.0 MiB 20 results.append(agg_dict)
41 961.5 MiB 0.0 MiB 1 aggregated_df = pd.DataFrame(results)
42 961.5 MiB 0.0 MiB 1 aggregated_df = aggregated_df.set_index('range_id')
43 961.5 MiB 0.0 MiB 2 aggregated_df.columns = pd.MultiIndex.from_tuples(
44 961.5 MiB 0.0 MiB 1 [('val1', 'min'), ('val1', 'max'), ('val1', 'mean'), ('val1', 'sum'),
45 ('val2', 'min'), ('val2', 'max'), ('val2', 'mean'), ('val2', 'sum')]
46 )
47 961.5 MiB 0.0 MiB 1 return aggregated_df
# user24714692_merge_agg_
Line # Mem usage Increment Occurrences Line Contents
=============================================================
3 879.9 MiB 879.9 MiB 1 def user24714692_merge_agg_(df, ranges):
4 1429.1 MiB 549.2 MiB 1 mdf = pd.merge(df, ranges, on='group', suffixes=('', '_range'))
5
6 1527.7 MiB 70.3 MiB 2 fdf = mdf[
7 1457.4 MiB 19.1 MiB 2 (mdf['start'] >= mdf['start_range']) &
8 1438.3 MiB 9.2 MiB 1 (mdf['end'] <= mdf['end_range'])
9 ]
10
11 1527.9 MiB 0.3 MiB 3 res = fdf.groupby(['group', 'start_range', 'end_range']).agg({
12 1527.7 MiB 0.0 MiB 1 'val1': ['min', 'max', 'mean', 'sum'],
13 1527.7 MiB 0.0 MiB 1 'val2': ['min', 'max', 'mean', 'sum']
14 1527.9 MiB 0.0 MiB 1 }).reset_index()
15
16 1527.9 MiB 0.0 MiB 14 res.columns = ['_'.join(col).strip() if col[1] else col[0] for col in res.columns.values]
17 1527.9 MiB 0.0 MiB 1 return res
Maximum memory usage