pythonthinkscript

VPN Indicator ThinkScript to Python


Taking a stab at converting a ThinkScript to Python for the first time, and I think my logic is right, but I am missing something as the two plots for the indicator don't match.

Trying to convert the ThinkScript for the VPNIndicator to a Python implementation. Looking for someone knowledgeable in both languages to contribute here.

To start, the indicator plot in ThinkorSwim looks like this (bottom):

ThinkorSwimPlot

So I'm trying to replicate that plot using matplotlib finance, but first I need to translate from ThinkScript to Python, which I've attempted here:

import mplfinance as mpf
import pandas as pd 
import numpy as np
import talib

def VPN_Indicator(df, params):

    # def atr = WildersAverage(TrueRange(high,  close,  low), length);
    df['H-L'] = df['High'] - df['Low']
    df['H-C1'] = df['High'] - df['Close'].shift()
    df['C1-L'] = df['Close'].shift() - df['Low']
    df['TrueRange'] = df[['H-L','H-C1','C1-L']].max(axis=1)
    df['WildersATR'] = df['TrueRange'].ewm(alpha=1.0 / params['length'], adjust=False).mean()

    # def diff = hlc3 - hlc3[1];
    df['Diff'] = ((df['High'] + df['Low'] + df['Close']) / 3) - ((df['High'].shift() + df['Low'].shift() + df['Close'].shift()) / 3) # Forward peak here?

    # def vp = Sum(if diff > factor * atr then volume else 0, length);
    df['VP_Helper'] = np.where(df['Diff'] > params['factor'] * df['WildersATR'], df['Volume'], 0)
    df['VP'] = df['VP_Helper'].rolling(params['length']).sum()

    # def vn = Sum(if diff < -factor * atr then volume else 0, length);
    df['VN_Helper'] = np.where(df['Diff'] < -params['factor'] * df['WildersATR'], df['Volume'], 0)
    df['VN'] = df['VN_Helper'].rolling(params['length']).sum()

    # plot VPN = ExpAverage(100 * (vp - vn) / Sum(volume, length), emaLength);
    df['RollingVol'] = df['Volume'].rolling(params['length']).sum()
    df['VPN'] = talib.EMA(100 * (df['VP'] - df['VN']) / df['RollingVol'], timeperiod=params['emaLength'])

    # plot VPNAvg = MovingAverage(averageType, VPN, averageLength);
    if params['averageType'] in ['simple','sma','SMA','SIMPLE']:
        df['VPNAvg'] = talib.SMA(df['VPN'], timeperiod=params['averageLength'])
    
    # plot CriticalLevel = criticalValue;
    df['CriticalLevel'] = params['criticalValue']

    # VPN.DefineColor("Above", Color.UPTICK);
    # VPN.DefineColor("Below", Color.DOWNTICK);
    # VPN.AssignValueColor(if VPN > CriticalLevel then VPN.Color("Above") else VPN.Color("Below"));
    # VPNAvg.SetDefaultColor(GetColor(7));
    # CriticalLevel.SetDefaultColor(GetColor(1));

    # Gimicks, don't need the top bit for now

    return df


params = {
    "length": 30,
    "emaLength": 3,
    "averageLength": 30,
    "factor": 0.1,
    "criticalValue": 10,
    "averageType": "simple"

}

# Import a 1min dataset and rename columns as necessary
df = pd.read_csv("SPY.csv").iloc[-2000:,:]
df['time'] = pd.to_datetime(df['time'])
df = df.set_index('time')
df = df.rename(columns={'open':'Open', 'high':'High', 'low':"Low", "close": "Close", "volume": "Volume"})
df = VPN_Indicator(df, params)

# Plot the results
apds = [ mpf.make_addplot((df['CriticalLevel']), panel=2, color='g'),
         mpf.make_addplot((df['VPN']), panel=2, color='g'),
         mpf.make_addplot((df['VPNAvg']), panel=2, color='g'),
       ]

mpf.plot(df[['Open', 'High', 'Low', 'Close', 'Volume']], addplot=apds, figscale=1.2, volume=True)

... which results in a plot that looks like this:

mpfplot

... which is close, but the peaks don't line up with the ThinkOrSwim plot. So I'm wanting to know from someone who knows these languages where I might be off? Thanks!


Solution

  • Try using this to calculate ATR. This gives the same output as TOS.

    import numpy as np
    
    def ema(arr, periods=14, weight=1, init=None):
        leading_na = np.where(~np.isnan(arr))[0][0]
        arr = arr[leading_na:]
        alpha = weight / (periods + (weight-1))
        alpha_rev = 1 - alpha
        n = arr.shape[0]
        pows = alpha_rev**(np.arange(n+1))
        out1 = np.array([])
        if 0 in pows:
            out1 = ema(arr[:int(len(arr)/2)], periods)
            arr = arr[int(len(arr)/2) - 1:]
            init = out1[-1]
            n = arr.shape[0]
            pows = alpha_rev**(np.arange(n+1))
        scale_arr = 1/pows[:-1]
        if init:
            offset = init * pows[1:]
        else:
            offset = arr[0]*pows[1:]
        pw0 = alpha*alpha_rev**(n-1)
        mult = arr*pw0*scale_arr
        cumsums = mult.cumsum()
        out = offset + cumsums*scale_arr[::-1]
        out = out[1:] if len(out1) > 0 else out
        out = np.concatenate([out1, out])
        out[:periods] = np.nan
        out = np.concatenate(([np.nan]*leading_na, out))
        return out
    
    
    def atr(highs, lows, closes, periods=14, ema_weight=1):
        hi = np.array(highs)
        lo = np.array(lows)
        c = np.array(closes)
        tr = np.vstack([np.abs(hi[1:]-c[:-1]),
                        np.abs(lo[1:]-c[:-1]),
                        (hi-lo)[1:]]).max(axis=0)
        atr = ema(tr, periods=periods, weight=ema_weight)
        atr = np.concatenate([[np.nan], atr])
        return atr