pythonpandaspysparkapache-spark-sqlpyspark-transformer

how to find max and min timestamp when a value goes below min threshold in pyspark?


I have a table like below-

time_is_seconds value
1 4.5
2 4
3 3
4 5
5 6
6 7
7 6
8 5
9 4.5
10 4.2
11 3
12 3.5

I want to find the min time and max time when the value goes below 5.

Expected output-

time_is_seconds value min_time max_time
1 4.5 1 3
2 4 1 3
3 3 1 3
4 5 Null Null
5 6 Null Null
6 7 Null Null
7 6 Null Null
8 5 Null Null
9 4.5 9 12
10 4.2 9 12
11 3 9 12
12 3.5 9 12

So far I have filtered out the value below 5 and find min and max which gave me the values as 1 and 12 respectively. I am wondering if there is any way to group them to find the expected results.

Codes used -

df1=df.filter(col('value')<5)
df1=(df1.withColumn('min_time',min(col('time_in_seconds'))
        .withColumn('max_time',max(col('time_in_seconds')))
df=df.join(df1,['time_in_seconds','value'],'left')

Solution

  • Based on the previous answers, I have applied a similar logic and got my answer:

    w=orderBy('time_in_seconds')
    w1=Window.partitionBy('cum_sum_ne')
    df1=df\
    .withColumn( 'bool_less_than_5',col('value')<5)\
    .withColumn('lag_bool_less_than_5',lag(col('bool_less_than_5')).over(w))\          
    .withColumn('ne_bool_less_than_5',when(col('bool_less_than_5')==col('lag_bool_less_than_5'),0).otherwise(1))\
    .withColumn('cum_sum_ne',sum(col('ne_bool_less_than_5')).over(w))\  
    .withColumn('min_time',min(col('time_in_seconds')).over(w1))\
    .withColumn('max_time',max(col('time_in_seconds')).over(w1))\
    .withColumn('min_time',when(col('bool_less_than_5'),col('min_time')))\
    .withColumn('max_time',when(col('bool_less_than_5'),col('max_time')))