Taking a stab at converting a ThinkScript to Python for the first time, and I think my logic is right, but I am missing something as the two plots for the indicator don't match.
Trying to convert the ThinkScript for the VPNIndicator to a Python implementation. Looking for someone knowledgeable in both languages to contribute here.
To start, the indicator plot in ThinkorSwim looks like this (bottom):
So I'm trying to replicate that plot using matplotlib finance, but first I need to translate from ThinkScript to Python, which I've attempted here:
import mplfinance as mpf
import pandas as pd
import numpy as np
import talib
def VPN_Indicator(df, params):
# def atr = WildersAverage(TrueRange(high, close, low), length);
df['H-L'] = df['High'] - df['Low']
df['H-C1'] = df['High'] - df['Close'].shift()
df['C1-L'] = df['Close'].shift() - df['Low']
df['TrueRange'] = df[['H-L','H-C1','C1-L']].max(axis=1)
df['WildersATR'] = df['TrueRange'].ewm(alpha=1.0 / params['length'], adjust=False).mean()
# def diff = hlc3 - hlc3[1];
df['Diff'] = ((df['High'] + df['Low'] + df['Close']) / 3) - ((df['High'].shift() + df['Low'].shift() + df['Close'].shift()) / 3) # Forward peak here?
# def vp = Sum(if diff > factor * atr then volume else 0, length);
df['VP_Helper'] = np.where(df['Diff'] > params['factor'] * df['WildersATR'], df['Volume'], 0)
df['VP'] = df['VP_Helper'].rolling(params['length']).sum()
# def vn = Sum(if diff < -factor * atr then volume else 0, length);
df['VN_Helper'] = np.where(df['Diff'] < -params['factor'] * df['WildersATR'], df['Volume'], 0)
df['VN'] = df['VN_Helper'].rolling(params['length']).sum()
# plot VPN = ExpAverage(100 * (vp - vn) / Sum(volume, length), emaLength);
df['RollingVol'] = df['Volume'].rolling(params['length']).sum()
df['VPN'] = talib.EMA(100 * (df['VP'] - df['VN']) / df['RollingVol'], timeperiod=params['emaLength'])
# plot VPNAvg = MovingAverage(averageType, VPN, averageLength);
if params['averageType'] in ['simple','sma','SMA','SIMPLE']:
df['VPNAvg'] = talib.SMA(df['VPN'], timeperiod=params['averageLength'])
# plot CriticalLevel = criticalValue;
df['CriticalLevel'] = params['criticalValue']
# VPN.DefineColor("Above", Color.UPTICK);
# VPN.DefineColor("Below", Color.DOWNTICK);
# VPN.AssignValueColor(if VPN > CriticalLevel then VPN.Color("Above") else VPN.Color("Below"));
# VPNAvg.SetDefaultColor(GetColor(7));
# CriticalLevel.SetDefaultColor(GetColor(1));
# Gimicks, don't need the top bit for now
return df
params = {
"length": 30,
"emaLength": 3,
"averageLength": 30,
"factor": 0.1,
"criticalValue": 10,
"averageType": "simple"
}
# Import a 1min dataset and rename columns as necessary
df = pd.read_csv("SPY.csv").iloc[-2000:,:]
df['time'] = pd.to_datetime(df['time'])
df = df.set_index('time')
df = df.rename(columns={'open':'Open', 'high':'High', 'low':"Low", "close": "Close", "volume": "Volume"})
df = VPN_Indicator(df, params)
# Plot the results
apds = [ mpf.make_addplot((df['CriticalLevel']), panel=2, color='g'),
mpf.make_addplot((df['VPN']), panel=2, color='g'),
mpf.make_addplot((df['VPNAvg']), panel=2, color='g'),
]
mpf.plot(df[['Open', 'High', 'Low', 'Close', 'Volume']], addplot=apds, figscale=1.2, volume=True)
... which results in a plot that looks like this:
... which is close, but the peaks don't line up with the ThinkOrSwim plot. So I'm wanting to know from someone who knows these languages where I might be off? Thanks!
Try using this to calculate ATR. This gives the same output as TOS.
import numpy as np
def ema(arr, periods=14, weight=1, init=None):
leading_na = np.where(~np.isnan(arr))[0][0]
arr = arr[leading_na:]
alpha = weight / (periods + (weight-1))
alpha_rev = 1 - alpha
n = arr.shape[0]
pows = alpha_rev**(np.arange(n+1))
out1 = np.array([])
if 0 in pows:
out1 = ema(arr[:int(len(arr)/2)], periods)
arr = arr[int(len(arr)/2) - 1:]
init = out1[-1]
n = arr.shape[0]
pows = alpha_rev**(np.arange(n+1))
scale_arr = 1/pows[:-1]
if init:
offset = init * pows[1:]
else:
offset = arr[0]*pows[1:]
pw0 = alpha*alpha_rev**(n-1)
mult = arr*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
out = out[1:] if len(out1) > 0 else out
out = np.concatenate([out1, out])
out[:periods] = np.nan
out = np.concatenate(([np.nan]*leading_na, out))
return out
def atr(highs, lows, closes, periods=14, ema_weight=1):
hi = np.array(highs)
lo = np.array(lows)
c = np.array(closes)
tr = np.vstack([np.abs(hi[1:]-c[:-1]),
np.abs(lo[1:]-c[:-1]),
(hi-lo)[1:]]).max(axis=0)
atr = ema(tr, periods=periods, weight=ema_weight)
atr = np.concatenate([[np.nan], atr])
return atr