apache-sparkdatepysparkapache-spark-sqlclosest

Get the closest currency rate


I would like to join two dataframes based on currency rate exchange and date from the second dataframe. I have tried the approach cited here, but the datediff gives the difference in dates so it doesn't give me the right rate.

df1:

from_curr to_curr Date value_to_convert
AED EUR 2017-03-24 2000
AED EUR 2017-03-27 189
DZD EUR 2017-01-12 130
EUR EUR 2020-01-01 11

df2 (currency_table):

transacti local DateTra rate_exchange
AED EUR 2017-03-24 -5,123
AED EUR 2017-03-26 -9.5
DZD EUR 2017-01-01 -6,12

The output should look like this:

from_curr to_curr Date value_to_convert value_converted
AED EUR 2017-03-24 2000 390.39
AED EUR 2017-03-27 189 19.89
DZD EUR 2017-01-12 130 21.24
EUR EUR 2020-01-01 11 11

The only method that works is substracting the difference between the two dates "DATE" and "DATETra" and get the closest date to the "DATETra". Could you please propose another method much cleaner then substracting strings?


Solution

  • You could aggregate your smaller dataframe (df2) in order to collect all the dates and rates into one cell. Then, join dataframes, take out what you need and do the division.

    Inputs:

    from pyspark.sql import functions as F
    df1 = spark.createDataFrame(     
        [('AED', 'EUR', '2017-03-24', 2000),
         ('AED', 'EUR', '2017-03-27', 189),
         ('DZD', 'EUR', '2017-01-12', 130),
         ('EUR', 'EUR', '2020-01-01', 11)],
        ['from_curr', 'to_curr', 'Date', 'value_to_convert'])
    df2 = spark.createDataFrame(
        [('AED', 'EUR', '2017-03-24', -5.123),
         ('AED', 'EUR', '2017-03-26', -9.5),
         ('DZD', 'EUR', '2017-01-01', -6.12)],
        ['transacti', 'local', 'DateTra', 'rate_exchange'])
    

    Script which gets the closest day's rate (could be from future):

    df2 = df2.groupBy('transacti', 'local').agg(
        F.collect_list(F.struct('DateTra', 'rate_exchange')).alias('_vals')
    )
    rate = F.array_sort(F.transform(
        '_vals',
        lambda x: F.struct(
            F.abs(F.datediff('Date', x.DateTra)).alias('diff'),
            -F.unix_timestamp(x.DateTra, 'yyyy-MM-dd').alias('DateTra'),
            F.abs(x.rate_exchange).alias('rate_exchange')
        )
    ))[0]['rate_exchange']
    df = (df1
        .join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
        .select(
            df1['*'],
            F.coalesce(
                F.col('value_to_convert') / rate,
                F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
            ).alias('value_converted')
        )
    )
    df.show()
    # +---------+-------+----------+----------------+------------------+
    # |from_curr|to_curr|      Date|value_to_convert|   value_converted|
    # +---------+-------+----------+----------------+------------------+
    # |      AED|    EUR|2017-03-24|            2000| 390.3962521959789|
    # |      AED|    EUR|2017-03-27|             189|19.894736842105264|
    # |      EUR|    EUR|2020-01-01|              11|              11.0|
    # |      DZD|    EUR|2017-01-12|             130|21.241830065359476|
    # +---------+-------+----------+----------------+------------------+
    

    Script which gets the most recent rate, but not from future:

    df2 = df2.groupBy('transacti', 'local').agg(
        F.sort_array(F.collect_list(F.struct('DateTra', 'rate_exchange')), False).alias('_vals')
    )
    
    rate = F.abs(F.filter('_vals', lambda x: x.DateTra <= F.col('Date'))[0]['rate_exchange'])
    df = (df1
        .join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
        .select(
            df1['*'],
            F.coalesce(
                F.col('value_to_convert') / rate,
                F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
            ).alias('value_converted')
        )
    )
    df.show()
    # +---------+-------+----------+----------------+------------------+
    # |from_curr|to_curr|      Date|value_to_convert|   value_converted|
    # +---------+-------+----------+----------------+------------------+
    # |      AED|    EUR|2017-03-24|            2000| 390.3962521959789|
    # |      AED|    EUR|2017-03-27|             189|19.894736842105264|
    # |      EUR|    EUR|2020-01-01|              11|              11.0|
    # |      DZD|    EUR|2017-01-12|             130|21.241830065359476|
    # +---------+-------+----------+----------------+------------------+