numpypyspark

Fast Fourier Transform (fft) aggregation on Spark Dataframe groupby


I am trying to get the fft over a window using numpy fft with spark dataframe like this:

import numpy as np

df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    percentile_approx("value", 0.25).alias("quantile_1(value)"),
    percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
    percentile_approx("value", 0.5).alias("quantile_2(value)"),
    percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
    percentile_approx("value", 0.75).alias("quantile_3(value)"),
    percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
    avg("value"),
    avg("magnitude"),
    min("value"),
    min("magnitude"),
    max("value"),
    max("magnitude"),
    kurtosis("value"),
    kurtosis("magnitude"),
    var_samp("value"),
    var_samp("magnitude"),
    stddev_samp("value"),
    stddev_samp("magnitude"),
    np.fft.fft("value"),
    np.fft.fft("magnitude"),
    np.fft.rfft("value"),
    np.fft.rfft("magnitude"),
)

Every aggregation function works fine, however for the fft I get:

tuple index out of range

and I don't understand why. Do I need to do anything particular to the values in order for numpy fft to work? The values are all floats. When I print the column it looks like this:

[Row(value_0=6.247499942779541), Row(value_0=63.0), Row(value_0=54.54375076293945), Row(value_0=0.7088077664375305), Row(value_0=51.431251525878906), Row(value_0=0.09377499669790268), Row(value_0=0.09707500040531158), Row(value_0=6.308750152587891), Row(value_0=8.503950119018555), Row(value_0=295.8463134765625), Row(value_0=7.938048839569092), Row(value_0=8.503950119018555), Row(value_0=0.7090428471565247), Row(value_0=0.7169944643974304), Row(value_0=0.5659012794494629)]

I am guessing the spark row might be an issue, but I am unsure of how to convert it in this context.


Solution

  • The np.fft.fft function belongs to the numpy library and isn't directly usable within PySpark dataframes. This is because np.fft.fft requires a list or array input, whereas PySpark dataframes are structured differently, and each column is treated individually.

    To apply np.fft.fft on a PySpark dataframe column like "value", which is currently a string, you need to first aggregate the values into a list format that np.fft.fft can interpret.

    Here's an example that illustrates how to prepare your data and apply np.fft.fft:

    from pyspark.sql import functions as F, types as T
    
    # Group the dataframe with a windowed aggregation
    df_grouped = df.groupBy(
        "id",
        "type",
        "mode",
        F.window("timestamp", "10 seconds", "5 seconds"),
    ).agg(
        F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
        ...,
        F.stddev_samp("magnitude"), 
        # Aggregate values and magnitudes into lists for FFT
        F.collect_list("value").alias("values"),
        F.collect_list("magnitude").alias("magnitudes")
    )
    
    # Define a UDF to apply FFT on arrays. You may create a similar one for rfft as needed.
    @F.udf(T.ArrayType(T.FloatType()))
    def fft_udf(array):
        # Convert FFT output to float to avoid serialization issues
        return [float(x) for x in np.fft.fft(array)]
    
    # Apply the FFT UDF to the 'values' column
    df_grouped = df_grouped.withColumn("fft_values", fft_udf(F.col("values")))
    

    Explanation:

    1. Aggregation: We use collect_list on the "value" and "magnitude" columns to gather all values per group in an array. This allows np.fft.fft to work on the entire sequence of values as needed.
    2. UDF for FFT: Since np.fft.fft returns a complex array, the UDF converts each element to float for compatibility with PySpark. If you need, define a similar UDF with np.fft.rfft function.