pythonpandasdataframedaskdask-dataframe

Compare a column against a dictionary in Dask


I have a dictionary:

dict = {10: 1, 50: 2, 200: 3, 500: 4}

And a Dask DataFrame:

+---+---+
|  a|  b|
+---+---+
|  1| 24|
|  1| 49|
|  2|125|
|  3|400|
+---+---+

I want to groupBy a and get the minimum b value. After that, I want to check which dict key is closest to b and create a new column with the dict value.

As a example, when b=24, the closest key is 10. So I want to assign the value 1. This is the result I am expecting:

+---+---+-------+
|  a|  b|closest|
+---+---+-------+
|  1| 24|      1|
|  1| 49|      2|
|  2|125|      3|
|  3|400|      4|
+---+---+-------+

I have found something similar with PySpark. I have not been able to make it run, but it apparently run for other people. Sharing it anyway for reference.

df = spark.createDataFrame(
    [
        (1, 24),
        (1, 49),
        (2, 125),
        (3, 400)
    ],
    ["a", "b"]
)

dict = {10:1, 50:2, 200: 3, 500: 4}

def func(value, dict):
    closest_key = (
        value if value in dict else builtins.min(
            dict.keys(), key=lambda k: builtins.abs(k - value)
        )
    )
    score = dict.get(closest_key)
    return score

df = (
    df.groupby('a')
        .agg(
            min('b')
        )
    ).withColumn('closest', func('b', dict))


From what I understand, I think on the spark version the calculation was done per row and I have not been able to replicate that.


Solution

  • I found a solution using pure dask in case you don't want to rely on other packages.

    Sample to test:

    import pandas as pd
    import dask.dataframe as dd
    import numpy as np
    
    _dict = {10: 1, 50: 2, 200: 3, 500: 4}
    df = pd.DataFrame({"a": [1,1,2,3], "b":[24,49,125,400]})
    ddf = dd.from_pandas(df, npartitions=2)
    

    Function:

    def get_closest(value: int, boundaries: dict) -> int:
        keys = np.array(list(boundaries.keys()))
        closest = keys[np.abs(keys - value).argmin()]
        score = boundaries[closest]
    
        return score
    

    Then we apply it:

    ddf['closest'] = ddf['b'].apply(
        get_closest,
        args=(_dict, True),
        meta=('int32')
    )