pythonfftfrequency-analysisgoertzel-algorithm

Generalized Goertzel algorithm for better peaks detection than FFT: shifted frequencies with real data?


I have translated the algorithm of the Generalized Goertzel technique in Python from the Matlab code that can be found here.

I have trouble using it on real data and only on real data: generating a testing "synthetic" signal with 5 sin components the Goertzel returns correct frequencies, obviously with better accuracy than FFT; both the techniques are aligned.

However, if I provide market data on N samples the FFT gives the lowest frequency f = 1/N as expected; Goertzel returns all frequencies higher than 1. The time frame of the data is 1 hour, but the data is unlabeled with timestamps, it could be also seconds, so my expectation is that the two ways of calculating the frequency transform should return, apart from different accuracies, the same harmonics on the frequency domain.

Why am I getting the lowest frequency in one method but greater than 1 using another method with real data?

import numpy as np

def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
    # Check input arguments
    if len(indvec) < 1:
        raise ValueError('Not enough input arguments')
    if not isinstance(x, np.ndarray) or x.size == 0:
        raise ValueError('X must be a nonempty numpy array')
    if not isinstance(indvec, np.ndarray) or indvec.size == 0:
        raise ValueError('INDVEC must be a nonempty numpy array')
    if np.iscomplex(indvec).any():
        raise ValueError('INDVEC must contain real numbers')
    
    lx = len(x)
    x = x.reshape(lx, 1)  # forcing x to be a column vector
    
    # Initialization
    no_freq = len(indvec)
    y = np.zeros((no_freq,), dtype=complex)
    
    # Computation via second-order system
    for cnt_freq in range(no_freq):
        # Precompute constants
        pik_term = 2 * np.pi * indvec[cnt_freq] / lx
        cos_pik_term2 = 2 * np.cos(pik_term)
        cc = np.exp(-1j * pik_term)  # complex constant
        
        # State variables
        s0 = 0
        s1 = 0
        s2 = 0
        
        # Main loop
        for ind in range(lx - 1):
            s0 = x[ind] + cos_pik_term2 * s1 - s2
            s2 = s1
            s1 = s0
        
        # Final computations
        s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
        y[cnt_freq] = s0 - s1 * cc
        
        # Complex multiplication substituting the last iteration
        # and correcting the phase for potentially non-integer valued
        # frequencies at the same time
        y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
    
    return y

Here are the charts of the FFT and Goertzel transform for the synthetic testing 5 components signal

enter image description here

here the Goertzel one

enter image description here

the original frequencies were

signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]

Instead, if I try to download market data

data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")

and try to transform the data['Close'] of SPY, this is what I get with the FFT transform with N = 800 samples

enter image description here

and the rebuilt signal on the first 2 components (not so good)

enter image description here

and this is what I get with the Goertzel transform

enter image description here

Note that the first peaks on FFT are below 0.005, for Goertzel above 1.

This is the way in which I tested the FFT on SPY data

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):

    num_harmonics = num_harmonics *1
    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Calcola la FFT dei valori "Close"
    fft_result = np.fft.fft(data["Close"].values)
    frequency_range = np.fft.fftfreq(len(fft_result))


    print("Frequencies: ")
    print(frequency_range)
    print("N Frequencies: ")
    print(len(frequency_range))


    print("First frequencies magnitude: ")
    print(np.abs(fft_result[0:num_harmonics]))


    # Trova le armoniche dominanti
    # top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
    top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one

    print("Top harmonics: ")
    print(top_harmonics)

    # top_harmonics = [1, 4]#, 8, 5, 9]



    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Calcola la ricostruzione basata sulle prime N armoniche
    reconstructed_signal = np.zeros(len(data))

    time = np.linspace(0, num_samples, num_samples, endpoint=False)
    # print('time')
    # print(time)
    for harmonic_index in top_harmonics[:num_harmonics]:
      amplitude = np.abs(fft_result[harmonic_index]) #.real
      phase = np.angle(fft_result[harmonic_index])
      frequency = frequency_range[harmonic_index]
      reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)


    # print('first reconstructed_signal len')
    # print(len(reconstructed_signal))

    zeros = np.zeros(len(original_data) - 2*len(data))
    reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)

    # print('second reconstructed_signal len')
    # print(len(reconstructed_signal))

    reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)

    original_data['reconstructed_signal'] = reconstructed_signal


    # print('reconstructed_signal len')
    # print(len(reconstructed_signal))

    # print('original_data len')
    # print(len(original_data))

    # print('reconstructed_signal[300:320]')
    # print(reconstructed_signal[290:320])

    # print('original_data[300:320]')
    # print(original_data[290:320][['Close', 'reconstructed_signal']])


    # reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])

    # print('reconstructed_signal')
    # print(reconstructed_signal)

    # Converte i valori complessi in valori reali per la ricostruzione
    # reconstructed_signal_real = reconstructed_signal.real

    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")

    # Visualizza il grafico dello spettro
    fig_spectrum.show()

    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)

as well as the test of SPY data on Goertzel

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):


    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Frequenze desiderate
    frequency_range = np.arange(0, 20, 0.001) 


    # Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
    transform = goertzel_general_shortened(data['Close'].values, frequency_range)

    harmonics_amplitudes = np.abs(transform)

    frequency_range = frequency_range  
    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Visualizza il grafico dello spettro
    fig_spectrum.show()


    peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
    peak_frequencies = frequency_range[peaks_indexes] 
    peak_amplitudes = harmonics_amplitudes[peaks_indexes]

    print('peaks_indexes')
    print(peaks_indexes[0:30])
    print('peak_frequencies')
    print(peak_frequencies[0:30])
    print('peak_amplitudes')
    print(peak_amplitudes[0:30])

    lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies


    higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]

    print('higher_amplitudes_sort_peak_indexes')
    print(higher_amplitudes_sort_peak_indexes[0:10])



    # used_indexes = lower_freq_sort_peak_indexes
    used_indexes = higher_amplitudes_sort_peak_indexes

    # Creazione del segnale ricostruito utilizzando i picchi
    time = np.linspace(0, num_samples, num_samples, endpoint=False) 
    reconstructed_signal = np.zeros(len(time), dtype=float)

    print('num_samples')
    print(num_samples)
    print('time[0:20]')
    print(time[0:20])


    print('reconstructed_signal')
    print(reconstructed_signal[0:10])

    for index in used_indexes:

        phase = np.angle(transform[index])  
        amplitude = np.abs(transform[index])
        frequency = frequency_range[index]
        print('phase')
        print(phase)
        print('amplitude')
        print(amplitude)
        print('frequency')
        print(frequency)
        reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)


    # Estrai la parte reale del segnale ricostruito
    reconstructed_signal_real = reconstructed_signal

    print('reconstructed_signal[1]')
    print(reconstructed_signal[1])

    print('reconstructed_signal.shape')
    print(reconstructed_signal.shape)

    zeros = np.zeros(len(original_data) - 2*num_samples)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    original_data['reconstructed_signal'] = reconstructed_signal_real



    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")


    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo

analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)

Edit 30/09/2023

I tried to normalize the SPY data as suggested in the answers, but the problem is still there, here is the resulting chart

enter image description here


Solution

  • I finally found the point. The Goertzel transform basically counts how many times each harmonic stays inside the sampling period returning, in addition, its amplitude and phase. To get the frequency it is necessary to divide by the number of samples, something that probably is implicitly done by the standard FFT libraries. The example with 5 "synthetic" sin waves turned out to be similar between FTT and Goertzel because was sampled in 1 second, so, for example, a 50Hz harmonic had exactly 50 sin waves in one full circle angle that madethe Goertzel transform resulting to be correct too.

    The correction is in the following added second line of code

    transform = goertzel_general_shortened(data, frequency_range)
    
    frequency_range = frequency_range/num_samples
    
    harmonics_amplitudes = np.abs(transform)
    peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
    peak_frequencies = frequency_range[peaks_indexes]
    peak_periods = 1 / frequency_range[peaks_indexes]
    peak_amplitudes = harmonics_amplitudes[peaks_indexes]
    peak_phases = np.angle(transform[peaks_indexes])
    
    peak_amplitudes = peak_amplitudes*peak_frequencies
    harmonics_amplitudes = harmonics_amplitudes*frequency_range