I have the following dataframe:
+---+-----+-------+
|day| time| result|
+---+-----+-------+
| 1| 6 | 0.5 |
| 1| 7 | 10.2 |
| 1| 8 | 5.7 |
| 2| 6 | 11.0 |
| 2| 10 | 22.3 |
+---+-----+-------+
I would like to normalize the results per day while keeping the time belonging to each result. I want to use MinMaxScaler
, but I assume that I have to cast the values to a dense vector for each day, but then how do I keep time values?
I like to normalize the results (...) I like to use MinMaxScaler
These two requirements are mutually exclusive. MinMaxScaler
cannot be used to operate on groups. You can use window functions
from pyspark.sql.functions import min, max, col
from pyspark.sql.window import Window
df = spark.createDataFrame(
[(1, 6, 0.5), (1, 7, 10.2), (1, 8, 5.7), (2, 6, 11.0), (2, 10, 22.3)],
("day", "time", "result"))
w = Window.partitionBy("day")
scaled_result = (col("result") - min("result").over(w)) / (max("result").over(w) - min("result").over(w))
df.withColumn("scaled_result", scaled_result).show()
# +---+----+------+------------------+
# |day|time|result| scaled_result|
# +---+----+------+------------------+
# | 1| 6| 0.5| 0.0|
# | 1| 7| 10.2| 1.0|
# | 1| 8| 5.7|0.5360824742268042|
# | 2| 6| 11.0| 0.0|
# | 2| 10| 22.3| 1.0|
# +---+----+------+------------------+
or group, aggregate and join:
minmax_result = df.groupBy("day").agg(min("result").alias("min_result"), max("result").alias("max_result"))
minmax_result.join(df, ["day"]).select(
"day", "time", "result",
((col("result") - col("min_result")) / (col("max_result") - col("min_result"))).alias("scaled_result")
).show()
# +---+----+------+------------------+
# |day|time|result| scaled_result|
# +---+----+------+------------------+
# | 1| 6| 0.5| 0.0|
# | 1| 7| 10.2| 1.0|
# | 1| 8| 5.7|0.5360824742268042|
# | 2| 6| 11.0| 0.0|
# | 2| 10| 22.3| 1.0|
# +---+----+------+------------------+