apache-sparkdatepysparkapache-spark-sqldivide

How I can divide each column of a dataframe with respect to values in another dataframe's column?


I have a time series data frame and I count days of each month with the following code:

count_dates=df_inner.groupBy('dates').count().orderBy('dates')

This code gives me a new data frame with 1 column:

dates          count
2021-03-01         5
2021-03-02        44
2021-03-03         3
2021-03-04         2
2021-03-05         1

This is another data frame:

name    2021-03-01  2021-03-02  2021-03-03    2021-03-04     2021-03-05
A           40          42             30         1                8
B           80           3             54         2                7
C           10           0             52         2                8

In other words, each date in the first dataset is a column in the other dataset. How I can divide each column of the second dataframe with the count column of the first df?

Output:

name    2021-03-01  2021-03-02  2021-03-03    2021-03-04     2021-03-05
A           40/5        42/44        30/3         1/2              8/5
B           80/5        3/44         54/3         2/2              7/5
C           10/5        0/44         52/3         2/2              8/5

Solution

  • Try the following.

    Example dfs:

    from pyspark.sql import SparkSession, functions as F
    spark = SparkSession.builder.getOrCreate()
    df1 = spark.createDataFrame(
        [('2021-03-01', 5),
         ('2021-03-02', 44),
         ('2021-03-03', 3),
         ('2021-03-04', 2),
         ('2021-03-05', 1)],
        ['dates', 'count']
    )
    df2 = spark.createDataFrame(
        [('A', 40, 42, 30, 1, 8),
         ('B', 80, 3, 54, 2, 7),
         ('C', 10, 0, 52, 2, 8)],
        ['name', '2021-03-01', '2021-03-02', '2021-03-03', '2021-03-04', '2021-03-05']
    )
    

    Script:

    # Unpivoting df2
    cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df2.columns if c != 'name']
    stack_string = ', '.join(cols_to_unpivot)
    df2 = df2.select(
        'name',
        F.expr(f'stack({len(cols_to_unpivot)}, {stack_string}) as (val, dates)')
    )
    
    # Joining
    df_joined = df2.join(df1, 'dates', 'full')
    
    # Pivoting the result
    df = df_joined.groupBy('name').pivot('dates').agg(F.first(F.col('val') / F.col('count')))
    
    df.show()
    # +----+----------+-------------------+------------------+----------+----------+
    # |name|2021-03-01|         2021-03-02|        2021-03-03|2021-03-04|2021-03-05|
    # +----+----------+-------------------+------------------+----------+----------+
    # |   B|      16.0|0.06818181818181818|              18.0|       1.0|       7.0|
    # |   C|       2.0|                0.0|17.333333333333332|       1.0|       8.0|
    # |   A|       8.0| 0.9545454545454546|              10.0|       0.5|       8.0|
    # +----+----------+-------------------+------------------+----------+----------+