I am trying to get the fft over a window using numpy fft with spark dataframe like this:
import numpy as np
df_grouped = df.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
percentile_approx("value", 0.25).alias("quantile_1(value)"),
percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
percentile_approx("value", 0.5).alias("quantile_2(value)"),
percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
percentile_approx("value", 0.75).alias("quantile_3(value)"),
percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
avg("value"),
avg("magnitude"),
min("value"),
min("magnitude"),
max("value"),
max("magnitude"),
kurtosis("value"),
kurtosis("magnitude"),
var_samp("value"),
var_samp("magnitude"),
stddev_samp("value"),
stddev_samp("magnitude"),
np.fft.fft("value"),
np.fft.fft("magnitude"),
np.fft.rfft("value"),
np.fft.rfft("magnitude"),
)
Every aggregation function works fine, however for the fft I get:
tuple index out of range
and I don't understand why. Do I need to do anything particular to the values in order for numpy fft to work? The values are all floats. When I print
the column it looks like this:
[Row(value_0=6.247499942779541), Row(value_0=63.0), Row(value_0=54.54375076293945), Row(value_0=0.7088077664375305), Row(value_0=51.431251525878906), Row(value_0=0.09377499669790268), Row(value_0=0.09707500040531158), Row(value_0=6.308750152587891), Row(value_0=8.503950119018555), Row(value_0=295.8463134765625), Row(value_0=7.938048839569092), Row(value_0=8.503950119018555), Row(value_0=0.7090428471565247), Row(value_0=0.7169944643974304), Row(value_0=0.5659012794494629)]
I am guessing the spark row might be an issue, but I am unsure of how to convert it in this context.
The np.fft.fft
function belongs to the numpy
library and isn't directly usable within PySpark dataframes. This is because np.fft.fft
requires a list or array input, whereas PySpark dataframes are structured differently, and each column is treated individually.
To apply np.fft.fft
on a PySpark dataframe column like "value"
, which is currently a string, you need to first aggregate the values into a list format that np.fft.fft
can interpret.
Here's an example that illustrates how to prepare your data and apply np.fft.fft
:
from pyspark.sql import functions as F, types as T
# Group the dataframe with a windowed aggregation
df_grouped = df.groupBy(
"id",
"type",
"mode",
F.window("timestamp", "10 seconds", "5 seconds"),
).agg(
F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
...,
F.stddev_samp("magnitude"),
# Aggregate values and magnitudes into lists for FFT
F.collect_list("value").alias("values"),
F.collect_list("magnitude").alias("magnitudes")
)
# Define a UDF to apply FFT on arrays. You may create a similar one for rfft as needed.
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
# Convert FFT output to float to avoid serialization issues
return [float(x) for x in np.fft.fft(array)]
# Apply the FFT UDF to the 'values' column
df_grouped = df_grouped.withColumn("fft_values", fft_udf(F.col("values")))
collect_list
on the "value"
and "magnitude"
columns to gather all values per group in an array. This allows np.fft.fft
to work on the entire sequence of values as needed.np.fft.fft
returns a complex array, the UDF converts each element to float
for compatibility with PySpark. If you need, define a similar UDF with np.fft.rfft
function.