pythonapache-sparkpyspark

Create unique index for each group PySpark


I am working with a relatively large dataframe (close to 1 billion rows) in PySpark. This dataframe is in "long" format, and I would like to have a unique index for each group defined by a groupBy over multiple columns. An example dataframe:

+--------------+-------+---------+------+------+
|classification|   id_1|     id_2|     t|     y|
+--------------+-------+---------+------+------+
|             1| person|    Alice|   0.1| 0.247|
|             1| person|    Alice|   0.2| 0.249|
|             1| person|    Alice|   0.3| 0.255|
|             0| animal|   Jaguar|   0.1| 0.298|
|             0| animal|   Jaguar|   0.2| 0.305|
|             0| animal|   Jaguar|   0.3| 0.310|
|             1| person|    Chris|   0.1| 0.267|
+--------------+-------+---------+------+------+

Here I would like to perform an operation such that I can index each group of ["classification", "id_1", "id_2"]. Example output is:

+--------------+-------+---------+------+------+----+
|classification|   id_1|     id_2|     t|     y| idx|
+--------------+-------+---------+------+------+----+
|             1| person|    Alice|   0.1| 0.247|   1|
|             1| person|    Alice|   0.2| 0.249|   1|
|             1| person|    Alice|   0.3| 0.255|   1|
|             0| animal|   Jaguar|   0.1| 0.298|   2|
|             0| animal|   Jaguar|   0.2| 0.305|   2|
|             0| animal|   Jaguar|   0.3| 0.310|   2|
|             1| person|    Chris|   0.1| 0.267|   3|
+--------------+-------+---------+------+------+----+

I cannot use monotonically_increasing_id() since I don't want a unique ID per row. What I've done, hopefully as a stop-gap, is to create another dataframe, create unique indices for each group, then join that dataframe back into the original.

from pyspark.sql import functions as F

df_groups = (
    df
    .select("classification", "id_1", "id_2")
    .dropDuplicates()
    .withColumn(
        "idx", F.monotonically_increasing_id()
    )
)

df = df.join(other=df_groups, on=["classification", "id_1", "id_2"])

This can be a pretty hefty operation, so I'm wondering if there is any native Spark operation that effectively does the same thing.


Solution

  • What you are asking is something that can actually be accomplished in many different ways! Here I will just describe two that should be enough to consider what works for your specific use case :)

    1. Hashing
    Hashing things is a very common way to create a unique value for something/collections of somethings. You can do this using a native spark function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.hash.html or just in the python standard library: https://docs.python.org/3/library/hashlib.html ; in either case of pyspark or native python, you can just hash all of the values of the classification, id_1, and id_2 columns together to get a unique index.

    2: Concatenating the values together
    The downside of hashing is that when you look at a hash, it will never tell you anything about what is in it. This is not a problem if you are just looking to create a unique index and that is it, but if you want to look at that unique index AND know exactly what it is then hashing is not that helpful. So instead, you can create your own human readable unique index by gluing the variables together into a single index. The way you can do this is by concatenating these columns together. Putting a delimiter between each of the column values would make it more robust. Either way, you can do this using the native pyspark concat function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html

    This answer is a little short and directly to the point, but I am happy to add/edit any context to it if you have any questions about it :)