
Count words from a list within array columns without invoking a shuffle

I'm coming from this post: pyspark: count number of occurrences of distinct elements in lists where the OP asked about getting the counts for distinct items from array columns. What if I already know the vocabulary in advance and want to get a vector of a preset length computed?

So let's say my vocabulary is

vocab = ['A', 'B', 'C', 'D', 'E']

and my data looks like this (altered from the other post)

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
     'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}

data['date'] = pd.to_datetime(data['date'])

data = pd.DataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))

and ultimately this is what I want:

|               date| flat      | counts              |
|2014-01-01 00:00:00|A;A;B      |[2, 1, 0, 0, 0]      |
|2014-01-02 00:00:00|D;B;E;B;B  |[0, 3, 0, 1, 1]      |
|2014-01-03 00:00:00|B;A        |[1, 1, 0, 0, 0]      |

Here is a working solution that seems inefficient, adapted from the solution to the prior post:

from pyspark.sql import functions as F
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("distinct_items", F.array_distinct("list") \
  .withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc+IF(t=x,1,0))))"""))\
  .withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
              for v in vocab
      ).drop("occurrences", "distinct_items").show()

Can I do this without having to create a map and then create arrays from the map? I need to do this procedure in practice on a large table with a large number of columns, so I would like to avoid having to do groupBy, agg type operations.


  • Very nice question. Your intuition is entirely correct: shuffle can be avoided in this case.

    from pyspark.sql import functions as F
    vocab = ['A', 'B', 'C', 'D', 'E']
    df = spark.createDataFrame([('A;A;B',), ('D;B;E;B;B',), ('B;A',),], ['flat'])
    voc_arr = F.array([F.lit(x) for x in vocab])
    df = df.withColumn('count', F.transform(voc_arr, lambda v: F.size(F.array_remove(F.transform(F.split('flat', ';'), lambda f: f == v), False))))
    # +---------+---------------+
    # |     flat|          count|
    # +---------+---------------+
    # |    A;A;B|[2, 1, 0, 0, 0]|
    # |D;B;E;B;B|[0, 3, 0, 1, 1]|
    # |      B;A|[1, 1, 0, 0, 0]|
    # +---------+---------------+