pythonpandasdataframepandas-groupbypandas-rolling

Compare two columns based on last N rows in a pandas DataFrame


I want to groupby "ts_code" and calculate percentage between one column max and min value from another column after max based on last N rows for each group. Specifically,

df

ts_code high low
0   A   20  10
1   A   30  5
2   A   40  20
3   A   50  10
4   A   20  30
5   B   20  10
6   B   30  5
7   B   40  20
8   B   50  10
9   B   20  30

Goal

Below is my expected result

   ts_code  high low l3_high_low_pct_chg    l4_high_low_pct_chg
    0   A   20  10  NA  NA
    1   A   30  5   NA  NA
    2   A   40  20  0.5 NA
    3   A   50  10  0.8 0.8
    4   A   20  30  0.8 0.8
    5   B   50  10  NA  NA
    6   B   30  5   NA  NA
    7   B   40  20  0.9 NA
    8   B   10  10  0.75    0.9
    9   B   20  30  0.75    0.75

ln_high_low_pct_chg(such as l3_high_low_pct_chg)= 1-(the min value of the low column after the peak)/(the max value of high column),on last N rows for each group and each row.

Try and problem

df['l3_highest']=df.groupby('ts_code')['high'].transform(lambda x: x.rolling(3).max())
df['l3_lowest']=df.groupby('ts_code')['low'].transform(lambda x: x.rolling(3).min())
df['l3_high_low_pct_chg']=1-df['l3_lowest']/df['l3_highest']

But it fails such that for second row, the l3_lowest would be 5 not 20. I don't know how to calculate percentage after peak.

For last 4 rows, at index=8, low=10,high=50,low=5, l4_high_low_pct_chg=0.9 , at index=9, high=40, low=10, l4_high_low_pct_chg=0.75

Another test data


Solution

  • Grouping by 'ts_code' is just a trivial groupby() function. DataFrame.rolling() function is for single columns, so it's a tricky to apply it if you need data from multiple columns. You can use "from numpy_ext import rolling_apply as rolling_apply_ext" as in this example: Pandas rolling apply using multiple columns. However, I just created a function that manually groups the dataframe into n length sub-dataframes, then applies the function to calculate the value. idxmax() finds the index value of the peak of the low column, then we find the min() of the values that follow. The rest is pretty straightforward.

    import numpy as np
    import pandas as pd
    
    df = pd.DataFrame([['A', 20, 10],
        ['A', 30, 5],
        ['A', 40, 20],
        ['A', 50, 10],
        ['A', 20, 30],
        ['B', 50, 10],
        ['B', 30, 5],
        ['B', 40, 20],
        ['B', 10, 10],
        ['B', 20, 30]],
        columns=['ts_code', 'high', 'low']
    )
        
     
    def custom_f(df, n):
        s = pd.Series(np.nan, index=df.index)
    
        def sub_f(df_):
            high_peak_idx = df_['high'].idxmax()
            min_low_after_peak = df_.loc[high_peak_idx:]['low'].min()
            max_high = df_['high'].max()
            return 1 - min_low_after_peak / max_high
    
        for i in range(df.shape[0] - n + 1):
            df_ = df.iloc[i:i + n]
            s.iloc[i + n - 1] = sub_f(df_)
    
        return s
    
    
    df['l3_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 3).values
    df['l4_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 4).values
    
    
    print(df)
    

    If you prefer to use the rolling function, this method gives the same output:

    def rolling_f(rolling_df):
        df_ = df.loc[rolling_df.index]
        high_peak_idx = df_['high'].idxmax()
        min_low_after_peak = df_.loc[high_peak_idx:]["low"].min()
        max_high = df_['high'].max()
        return 1 - min_low_after_peak / max_high
    
    df['l3_high_low_pct_chg'] = df.groupby("ts_code").rolling(3).apply(rolling_f).values[:, 0]
    df['l4_high_low_pct_chg'] = df.groupby("ts_code").rolling(4).apply(rolling_f).values[:, 0]
    
    print(df)
    

    Finally, if you want to do a true rolling window calculation that avoids any index lookup, you can use the numpy_ext (https://pypi.org/project/numpy-ext/)

    from numpy_ext import rolling_apply
    
    def np_ext_f(rolling_df, n):
        def rolling_apply_f(high, low):
            return 1 - low[np.argmax(high):].min() / high.max()
        try:
            return pd.Series(rolling_apply(rolling_apply_f, n, rolling_df['high'].values, rolling_df['low'].values), index=rolling_df.index)
        except ValueError:
            return pd.Series(np.nan, index=rolling_df.index)
    
    
    df['l3_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=3).sort_index(level=1).values
    df['l4_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=4).sort_index(level=1).values
    
    print(df)
    

    output:

      ts_code  high  low  l3_high_low_pct_chg  l4_high_low_pct_chg
    0       A    20   10                  NaN                  NaN
    1       A    30    5                  NaN                  NaN
    2       A    40   20                 0.50                  NaN
    3       A    50   10                 0.80                 0.80
    4       A    20   30                 0.80                 0.80
    5       B    50   10                  NaN                  NaN
    6       B    30    5                  NaN                  NaN
    7       B    40   20                 0.90                  NaN
    8       B    10   10                 0.75                 0.90
    9       B    20   30                 0.75                 0.75
    

    For large datasets, the speed of these operations becomes an issue. So, to compare the speed of these different methods, I created a timing function:

    import time
    
    def timeit(f):
    
        def timed(*args, **kw):
            ts = time.time()
            result = f(*args, **kw)
            te = time.time()
            print ('func:%r took: %2.4f sec' % \
              (f.__name__, te-ts))
            return result
    
        return timed
    

    Next, let's make a large DataFrame, just by copying the existing dataframe 500 times:

    df = pd.concat([df for x in range(500)], axis=0)
    df = df.reset_index()
    

    Finally, we run the three tests under a timing function:

    @timeit
    def method_1():
        df['l52_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 52).values
    method_1()
    
    @timeit
    def method_2():
        df['l52_high_low_pct_chg'] = df.groupby("ts_code").rolling(52).apply(rolling_f).values[:, 0]
    method_2()
    
    @timeit
    def method_3():
        df['l52_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=52).sort_index(level=1).values
    method_3()
    

    Which gives us this output:

    func:'method_1' took: 2.5650 sec
    func:'method_2' took: 15.1233 sec
    func:'method_3' took: 0.1084 sec
    

    So, the fastest method is to use the numpy_ext, which makes sense because that's optimized for vectorized calculations. The second fastest method is the custom function I wrote, which is somewhat efficient because it does some vectorized calculations while also doing some Pandas lookups. The slowest method by far is using Pandas rolling function.