pythonapache-sparkpysparkgroup-bynormalization

Spark MinMaxScaler on dataframe


I have the following dataframe:

+---+-----+-------+
|day| time| result|                 
+---+-----+-------+
|  1| 6   |  0.5  |
|  1| 7   |  10.2 |
|  1| 8   |   5.7 |
|  2| 6   |  11.0 |
|  2| 10  |  22.3 |
+---+-----+-------+

I would like to normalize the results per day while keeping the time belonging to each result. I want to use MinMaxScaler, but I assume that I have to cast the values to a dense vector for each day, but then how do I keep time values?


Solution

  • I like to normalize the results (...) I like to use MinMaxScaler

    These two requirements are mutually exclusive. MinMaxScaler cannot be used to operate on groups. You can use window functions

    from pyspark.sql.functions import min, max, col
    from pyspark.sql.window import Window
    
    df = spark.createDataFrame(
        [(1, 6, 0.5), (1, 7, 10.2), (1, 8, 5.7), (2, 6, 11.0), (2, 10, 22.3)], 
        ("day", "time", "result"))
    
    w = Window.partitionBy("day")
    
    scaled_result = (col("result") - min("result").over(w)) / (max("result").over(w) - min("result").over(w))
    
    df.withColumn("scaled_result", scaled_result).show()
    # +---+----+------+------------------+                                            
    # |day|time|result|     scaled_result|
    # +---+----+------+------------------+
    # |  1|   6|   0.5|               0.0|
    # |  1|   7|  10.2|               1.0|
    # |  1|   8|   5.7|0.5360824742268042|
    # |  2|   6|  11.0|               0.0|
    # |  2|  10|  22.3|               1.0|
    # +---+----+------+------------------+
    

    or group, aggregate and join:

    minmax_result = df.groupBy("day").agg(min("result").alias("min_result"), max("result").alias("max_result"))
    
    minmax_result.join(df, ["day"]).select(
        "day", "time", "result", 
         ((col("result") - col("min_result")) / (col("max_result") - col("min_result"))).alias("scaled_result")
    ).show()
    # +---+----+------+------------------+                                            
    # |day|time|result|     scaled_result|
    # +---+----+------+------------------+
    # |  1|   6|   0.5|               0.0|
    # |  1|   7|  10.2|               1.0|
    # |  1|   8|   5.7|0.5360824742268042|
    # |  2|   6|  11.0|               0.0|
    # |  2|  10|  22.3|               1.0|
    # +---+----+------+------------------+