apache-sparkpysparkapache-spark-sql

How to create row_index for a Spark dataframe using window.partionBy()?


I have a Dataframe with single column like shown below.

Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'

To the above dataframe I have added a new column called 'const'.

df = df.withColumn('const',F.lit(1))

How do I perform a cumsum using window.partionBy() on 'const' column and create new row_id column?

Expected Output

Type  row_id
'BAT'   1
'BAT'   2
'BALL'  3
'BAT'   4
'BALL'  5
'BALL'  6

I also dont want to use RDD, everything should be in Dataframe due to performance reasons.

EDIT


Solution

  • if you just want a row index without taking into account the values, then use :

    df = df.withColumn('row_id',F.monotonically_increasing_id())
    

    this will create a unic index for each line.

    If you want to take into account your values, and have the same index for a duplicate value, then use rank:

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    w = Window().partitionBy().orderBy("type")
    df = df.withColumn('row_id',F.rank().over(w))
    

    You can of course achieve the same with sum or row_number, but the 2 methods above are better i think.

    import sys
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
    df = df.withColumn('row_id',F.sum("const").over(w))
    

    or

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    w = Window().partitionBy().orderBy("const")
    df = df.withColumn('row_id',F.row_number().over(w))