arrayspysparkstructexplode

create a subset array-of-struct column without exploding


new to pyspark. i have this example dataframe:

df = spark.createDataFrame(
    (("7dc88", "D21", 14.14, 2, 10, [["msgA", 15, "a"],["msgB", 9, "g"],["msgC", 6, "z"],["msgD", 4, "m"],["msgE", 1, "e"]]),
     ("1c36a3", "D21", 32.14, 18, 45, [["msgA", 45, "n"],["msgB", 30, "q"],["msgC", 24, "h"],["msgD", 19, "y"],["msgE", 11, "c"]])),
     "uniqueId : string, tag : string, score : float, time0 : int, time1 : int, msgs : array<struct<msg : string, time : int, sysid : string>>")


+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs                                                                       |
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|7dc885  |D21|14.14|2    |10   |[{msgA, 15, a}, {msgB, 9, g}, {msgC, 6, z}, {msgD, 4, m}, {msgE, 1, e}]    |
|1c36a3  |D21|32.14|18   |45   |[{msgA, 45, n}, {msgB, 30, q}, {msgC, 24, h}, {msgD, 19, y}, {msgE, 11, c}]|
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+

the msgs column is an array of struct (msg, time, sysid). (in my real use-case, the message structure has more elements and some are nested structures. also each uniqueId could have a couple hundred messages. and there are millions of uniqueIds.)

the time0 and time1 column specify an inclusive time range for a desired subset of the msgs. i need to create a new column msgs_subset that is ordered by time:

+--------+---+-----+-----+-----+------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs_subset                                                 |
+--------+---+-----+-----+-----+------------------------------------------------------------+
|7dc885  |D21|14.14|2    |10   |[{msgD, 4, m}, {msgC, 6, z}, {msgB, 9, g}]                  |
|1c36a3  |D21|32.14|18   |45   |[{msgD, 19, y}, {msgC, 24, h}, {msgB, 30, q}, {msgA, 45, n}]|
+--------+---+-----+-----+-----+------------------------------------------------------------+

i'm able to create the msgs_subset column using this code:

df_msg_subset = (
    df
    .withColumn("msg_explode", F.explode(F.col("msgs")))
    .withColumn("msg_time", F.col("msg_explode.time"))
    .filter((F.col("msg_time") >= F.col('time0')) & (F.col("msg_time") <= F.col('time1')))
    .sort(F.col("uniqueId"), F.col("msg_time"))
    .groupBy(list(set(df.columns) - {'msgs'})).agg(F.collect_list('msg_explode').alias('msgs_subset'))
)

QUESTION: can this be done without exploding? i'm assuming that exploding (creating additional rows) only to then groupby/collapse them is expensive. how can i do the same using pyspark (not UDF) functions operating on msgs? it feels like it would be something like this, but i'm stuck on the create_subset_column function.

def create_subset_column(msgs, time0, time1):
    return F.sort_array(F.filter(...))

df_msg_subset = (
    df
    .withColumn("msgs_subset", create_subset_column("msgs", "time0", "time1"))
    .drop("msgs")
)

i'd appreciate any help/pointers.

AN EXAMPLE DF WHERE TIMES ARE EPOCH_MILLISEC

    df = spark.createDataFrame(
    (
        ("7dc88", "D21", 14.14, 1642970972787, 1642970985027, [
            ["msgA", 1642970990067, "a"],
            ["msgB", 1642970985027, "g"],
            ["msgC", 1642970978077, "z"],
            ["msgD", 1642970972787, "m"],
            ["msgE", 1642970960897, "e"],
        ]),
        ("1c36a3", "D21", 32.14, 1642971056787, 1642971074107, [
            ["msgA", 1642971080687, "n"],
            ["msgB", 1642971074107, "q"],
            ["msgC", 1642971068777, "h"],
            ["msgD", 1642971062157, "y"],
            ["msgE", 1642971056787, "c"],
        ])
    ),
    "uniqueId:string, tag:string, score:float, time0:long, time1:long, msgs:array<struct<msg:string, time:long, sysid:string>>"
)

Solution

  • Does this help?

    df = spark.createDataFrame(
        (("7dc88", "D21", 14.14, 2, 10, [["msgA", 15, "a"],["msgB", 9, "g"],["msgC", 6, "z"],["msgD", 4, "m"],["msgE", 1, "e"]]),
         ("1c36a3", "D21", 32.14, 18, 45, [["msgA", 45, "n"],["msgB", 30, "q"],["msgC", 24, "h"],["msgD", 19, "y"],["msgE", 11, "c"]])),
         "uniqueId : string, tag : string, score : float, time0 : int, time1 : int, msgs : array<struct<msg : string, time : int, sysid : string>>")
    
    Original:
    +--------+---+-----+-----+-----+---------------------------------------------------------------------------+
    |uniqueId|tag|score|time0|time1|msgs                                                                       |
    +--------+---+-----+-----+-----+---------------------------------------------------------------------------+
    |7dc88   |D21|14.14|2    |10   |[{msgA, 15, a}, {msgB, 9, g}, {msgC, 6, z}, {msgD, 4, m}, {msgE, 1, e}]    |
    |1c36a3  |D21|32.14|18   |45   |[{msgA, 45, n}, {msgB, 30, q}, {msgC, 24, h}, {msgD, 19, y}, {msgE, 11, c}]|
    +--------+---+-----+-----+-----+---------------------------------------------------------------------------+
    
         
    df = (df.withColumn('msgs', expr("filter(msgs, msg -> ((msg.time >= time0) and (msg.time <= time1)))"))
            .withColumn("msgs", array_sort(expr("transform(msgs, x -> struct(x.time, x as original))")))
            .withColumn("msgs", expr("transform(msgs, x -> x.original)")))
    
    df.show(truncate=False)
    
    Transformed:
    --------+---+-----+-----+-----+------------------------------------------------------------+
    |uniqueId|tag|score|time0|time1|msgs                                                        |
    +--------+---+-----+-----+-----+------------------------------------------------------------+
    |7dc88   |D21|14.14|2    |10   |[{msgD, 4, m}, {msgC, 6, z}, {msgB, 9, g}]                  |
    |1c36a3  |D21|32.14|18   |45   |[{msgD, 19, y}, {msgC, 24, h}, {msgB, 30, q}, {msgA, 45, n}]|
    +--------+---+-----+-----+-----+------------------------------------------------------------+