pythonazurepysparklogic

Pyspark creating paring logic


I am working in Azure synapse and getting myself used to working with pyspark. I want to create a paring logic between lines in my df but I can’t get it to work. I have an ID column and a sequence number. For example:

ID seqNum
100 3609
100 3610
100 3616
100 3617
100 3622
100 3623
100 3634
100 3642
100 3643

This is what the code should output:

ID seqNum pairID
100 3609 1
100 3610 1
100 3616 2
100 3617 2
100 3622 3
100 3623 3
100 3634 Null
100 3642 4
100 3643 4

Line with 3634 should not be paired because the difference between sequence numbers should be one.

I have logic in python that seems to work but then I cannot make use of the processing abilitys from spark. Can someone help me create the logic in pyspark?


# window specification
windowSpec = Window.orderBy("seqNum")

# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))

# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))

#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))

# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect()  # Collect rows for iterative processing
paired_indices = set()  # Track already paired rows
result = []

for i, row in enumerate(rows):
    if i in paired_indices:
        continue  # Skip already paired rows

    current = row["seqNum"]
    prev_diff = row["diff_prev"]
    next_diff = row["diff_next"]

    # Pair with the row above if diff_prev == 1 and it is not already paired
    if prev_diff == 1 and (i - 1) not in paired_indices:
        result.append((current, pair_id, rows[i - 1]["seqNum"]))
        result.append((rows[i - 1]["seqNum"], pair_id, current))
        paired_indices.update([i, i - 1])
        pair_id += 1

    # Pair with the row below if diff_next == 1 and it is not already paired
    elif next_diff == 1 and (i + 1) not in paired_indices:
        result.append((current, pair_id, rows[i + 1]["seqNum"]))
        result.append((rows[i + 1]["seqNum"], pair_id, current))
        paired_indices.update([i, i + 1])
        pair_id += 1

   
    else:
        result.append((current, None, None))

# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])

Solution

  • If you want pair id for only 2 numbers you can use below code.

    But here you won't get the correct order of pair ids but they will be paired with a number.

    df.withColumn("prev_seq", lag("seqNum").over(windowSpec))\
        .withColumn("next_seq", lead("seqNum").over(windowSpec))\
        .withColumn("pair_with_prev", when((col("seqNum") - col("prev_seq")) == 1, lit(1)).otherwise(lit(0)))\
        .withColumn("pair_with_next", when((col("next_seq") - col("seqNum")) == 1, lit(1)).otherwise(lit(0)))\
        .withColumn("cond1",when((col("pair_with_prev") == 0) & (col("pair_with_next") == 1), randn()).otherwise(lit(0)))\
        .withColumn("cond2",when((col("pair_with_prev") == 1) & (col("pair_with_next") == 0), lag("cond1").over(windowSpec)).otherwise(lit(0)))\
        .withColumn("all_cond",col("cond1")+col("cond2"))\
        .withColumn("res",dense_rank().over(Window.partitionBy("ID").orderBy("all_cond")))\
        .withColumn("res",when(col("all_cond") == 0, lit(None)).otherwise(col("res")))\
        .select("ID","seqNum","res")\
        .display()
    

    Here i checking condition for both pair with previous and next.

    if it has no previous pair and has next pair then there is new pair found, i generated random value.

    if there is previous pair and not next then that is end of the pair, i assigned previous pair random value.

    Next, i generated dense rank on both of the above condition and made id null for no pairs.

    output:

    ID seqNum res
    100 3634 null
    100 3616 2
    100 3617 2
    100 3609 3
    100 3610 3
    100 3622 4
    100 3623 4
    100 3642 5
    100 3643 5