I am having a hard time understanding how I can better improve this code:
from datetime import datetime
import math
import pandas as pd
import numpy as np
def trending(row, label, amt, prepend = False, postpend = False, reverse = True):
arr = []
if prepend:
arr.append(row[label])
for i in range(amt):
arr.append(row[f'{label}_{i}'])
if postpend:
arr.append(row[label])
if reverse:
arr.reverse()
if any(x == None or math.isnan(x) for x in arr):
return 0
return slope(arr)
def slope(arr):
if len(arr) <= 1:
return 0
coeffs = np.polyfit(range(len(arr)), arr, 1)
slope = coeffs[-2]
return float(slope)
def trends(df):
start = datetime.now()
look_back = 5
for i in range(look_back):
j = i + 1
df2 = df[['close', 'pvi', 'nvi', 'smi', 'roc', 'macd', 'histogram', 'percent_b', 'height']]
df2 = df2.add_suffix(f'_{i}')
df2 = df2.shift(j)
df = pd.concat([df, df2], axis=1)
def close_trend(row):
return trending(row, 'close', look_back, True, False)
def pvi_trend(row):
return trending(row, 'pvi', look_back, True, False)
def nvi_trend(row):
return trending(row, 'nvi', look_back, True, False)
def smi_trend(row):
return trending(row, 'smi', look_back, True, False)
def macd_trend(row):
return trending(row, 'macd', look_back, True, False)
def roc_trend(row):
return trending(row, 'roc', look_back, True, False)
def histogram_trend(row):
return trending(row, 'histogram', look_back, True, False)
def percent_b_trend(row):
return trending(row, 'percent_b', look_back, True, False)
def height_trend(row):
return trending(row, 'height', look_back, True, False)
df['close_trend'] = df.apply(close_trend, axis=1)
df['pvi_trend'] = df.apply(pvi_trend, axis=1)
df['nvi_trend'] = df.apply(nvi_trend, axis=1)
df['smi_trend'] = df.apply(smi_trend, axis=1)
df['macd_trend'] = df.apply(macd_trend, axis=1)
df['roc_trend'] = df.apply(roc_trend, axis=1)
df['histogram_trend'] = df.apply(histogram_trend, axis=1)
df['percent_b_trend'] = df.apply(percent_b_trend, axis=1)
df['height_trend'] = df.apply(height_trend, axis=1)
print(f'Trends function took {datetime.now() - start}')
return df
data = []
for i in range(10000):
data.append([545.9, 0.3333398862, 0.01673619117, 0.2111060119, 55.95725508, 1.100447539, 0.8652411735, 0.8219623901, 1.808441041, 46.79554862])
columns = ['close', 'macd', 'histogram', 'roc', 'rsi', 'pvi', 'nvi', 'percent_b', 'height', 'smi']
df = pd.DataFrame(columns=columns, data=data)
df = trends(df)
print(df)
It works, but it is awfully slow. I have a dataframe without about 10k rows and it takes about 30s. Basically, what I am trying to accomplish is this:
You can achieve huge speed-up (200-300x) by applying polyfit to whole dataframe instead of doing it row by row.
I added some code to check correctness, check yourself and remove afterwards.
def trending_vectorized(df, label, amt, prepend=False, postpend=False, reverse=True):
arr = []
if prepend:
arr.append(df[label])
for i in range(amt):
arr.append(df[f'{label}_{i}'])
if postpend:
arr.append(df[label])
if reverse:
arr.reverse()
arr = np.vstack(arr)
poly_coeffs = np.polyfit(np.arange(len(arr)), np.vstack(arr), 1)
poly_coeffs[np.isnan(poly_coeffs)] = 0 # possible speed-up: insert zeros where needed
return poly_coeffs[0, :] # slope only
def trends(df):
start = datetime.now()
look_back = 5
for i in range(look_back):
j = i + 1
df2 = df[['close', 'pvi', 'nvi', 'smi', 'roc', 'macd', 'histogram', 'percent_b', 'height']]
df2 = df2.add_suffix(f'_{i}')
df2 = df2.shift(j)
df = pd.concat([df, df2], axis=1)
def close_trend(row):
return trending(row, 'close', look_back, True, False)
def pvi_trend(row):
return trending(row, 'pvi', look_back, True, False)
def nvi_trend(row):
return trending(row, 'nvi', look_back, True, False)
def smi_trend(row):
return trending(row, 'smi', look_back, True, False)
def macd_trend(row):
return trending(row, 'macd', look_back, True, False)
def roc_trend(row):
return trending(row, 'roc', look_back, True, False)
def histogram_trend(row):
return trending(row, 'histogram', look_back, True, False)
def percent_b_trend(row):
return trending(row, 'percent_b', look_back, True, False)
def height_trend(row):
return trending(row, 'height', look_back, True, False)
df['close_trend'] = df.apply(close_trend, axis=1)
df['pvi_trend'] = df.apply(pvi_trend, axis=1)
df['nvi_trend'] = df.apply(nvi_trend, axis=1)
df['smi_trend'] = df.apply(smi_trend, axis=1)
df['macd_trend'] = df.apply(macd_trend, axis=1)
df['roc_trend'] = df.apply(roc_trend, axis=1)
df['histogram_trend'] = df.apply(histogram_trend, axis=1)
df['percent_b_trend'] = df.apply(percent_b_trend, axis=1)
df['height_trend'] = df.apply(height_trend, axis=1)
df['close_trend2'] = trending_vectorized(df, 'close', look_back, True, False)
df['pvi_trend2'] = trending_vectorized(df, 'pvi', look_back, True, False)
df['nvi_trend2'] = trending_vectorized(df, 'nvi', look_back, True, False)
df['smi_trend2'] = trending_vectorized(df, 'smi', look_back, True, False)
df['macd_trend2'] = trending_vectorized(df, 'macd', look_back, True, False)
df['roc_trend2'] = trending_vectorized(df, 'roc', look_back, True, False)
df['histogram_trend2'] = trending_vectorized(df, 'histogram', look_back, True, False)
df['percent_b_trend2'] = trending_vectorized(df, 'percent_b', look_back, True, False)
df['height_trend2'] = trending_vectorized(df, 'height', look_back, True, False)
assert np.allclose(df['close_trend'], df['close_trend2'])
assert np.allclose(df['pvi_trend'], df['pvi_trend2'])
assert np.allclose(df['nvi_trend'], df['nvi_trend2'])
assert np.allclose(df['smi_trend'], df['smi_trend2'])
assert np.allclose(df['macd_trend'], df['macd_trend2'])
assert np.allclose(df['roc_trend'], df['roc_trend2'])
assert np.allclose(df['histogram_trend'], df['histogram_trend2'])
assert np.allclose(df['percent_b_trend'], df['percent_b_trend2'])
assert np.allclose(df['height_trend'], df['height_trend2'])
print(f'Trends function took {datetime.now() - start}')
return df
Noted you liked clean code, so bonus - cleaned function:
def trends2(df):
col_names = ['close', 'pvi', 'nvi', 'smi', 'macd', 'roc', 'histogram', 'percent_b', 'height']
look_back = 5
for i in range(look_back):
j = i + 1
df2 = df[col_names]
df2 = df2.add_suffix(f'_{i}')
df2 = df2.shift(j)
df = pd.concat([df, df2], axis=1)
for col_name in col_names:
df[f'{col_name}_trend'] = trending_vectorized(df, col_name, look_back, True, False)
return df