pythondataframeapache-sparkpysparkapache-spark-sql

Monotonically increasing id order


The spec of monotonically order id monotonically_increasing_id says

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

So I assume there is some ordering otherwise increasing has no meaning. So the question what does increasing mean? Or is it simply a badly named unique id?


Solution

  • I have not yet come to a use case where we would make use of the "increasing" factor of monotonically_increasing_id. I just use it to generate unique row IDs. E.g., when not having an explicit ID column, I have to explode some column and for later uses I need to be able to tell which rows came from a single original row.

    You're correct that monotonically_increasing_id has an order. But you cannot really control it. The order is just the order in which rows are written into a node.

    E.g., when we explicitly create a dataframe (or read it from a single CSV file), we know the original order in which rows will be written into nodes. However, we don't necessarily know how many nodes we will have and where we will have the cut-off.

    from pyspark.sql import functions as F
    
    df1 = spark.createDataFrame([(1,), (3,), (2,), (5,), (4,), (6,)]) \
        .withColumn('mono_id_1', F.monotonically_increasing_id())
    df1.show()
    # +---+----------+
    # | _1| mono_id_1|
    # +---+----------+
    # |  1|         0|
    # |  3|         1|
    # |  2|         2|
    # |  5|8589934592|
    # |  4|8589934593|
    # |  6|8589934594|
    # +---+----------+
    

    Here you see that the order was preserved exactly like we have specified. You see that there is a monotonical increase within nodes (there are two nodes in this case, as we see a cut-off after the first 3 rows). So, it increases just based on the order within a node.

    However, after a few transformations where data shuffling is involved (joins, grouping / aggregations, distinct, window, etc.), your data will move from one node to another, and you cannot anymore control the order in which rows will be written into nodes. A simple example would be repartition which also shuffles the data between nodes.

    df2 = df1.repartition(3) \
        .withColumn('mono_id_2', F.monotonically_increasing_id())
    df2.show()
    # +---+----------+-----------+
    # | _1| mono_id_1|  mono_id_2|
    # +---+----------+-----------+
    # |  3|         1|          0|
    # |  4|8589934593|          1|
    # |  1|         0| 8589934592|
    # |  5|8589934592| 8589934593|
    # |  2|         2|17179869184|
    # |  6|8589934594|17179869185|
    # +---+----------+-----------+
    

    In this example data, we can predict how Spark will move data. But you cannot explicitly control it. And in real-world scenarios, when you have a couple of hundred executors performing a join, it not even predictable how rows will be moved and written. Some rows may be more difficult to process, some executor may not be performing well, etc. etc...