apache-sparkapache-spark-sql

How to find longest sequence of consecutive dates?


I have a database with time visit in timestamp like this

ID, time
1, 1493596800
1, 1493596900
1, 1493432800
2, 1493596800
2, 1493596850
2, 1493432800

I use spark SQL and I need to have the longest sequence of consecutives dates for each ID like

ID, longest_seq (days)
1, 2
2, 5
3, 1

I tried to adapt this answer Detect consecutive dates ranges using SQL to my case but I didn't manage to have what I expect.

 SELECT ID, MIN (d), MAX(d)
    FROM (
      SELECT ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) AS d, 
                ROW_NUMBER() OVER(
         PARTITION BY ID ORDER BY cast(from_utc_timestamp(cast(time as timestamp), 'CEST') 
                                                           as date)) rn
      FROM purchase
      where ID is not null
      GROUP BY ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) 
    )
    GROUP BY ID, rn
    ORDER BY ID

If someone has some clue on how to fix this request, or what's wrong in it, I would appreciate the help Thanks

[EDIT] A more explicit input /output

ID, time
1, 1
1, 2
1, 3
2, 1
2, 3
2, 4
2, 5
2, 10
2, 11
3, 1
3, 4
3, 9
3, 11

The result would be :

ID, MaxSeq (in days)
1,3
2,3
3,1

All the visits are in timestamp, but I need consecutives days, then each visit by day is counted once by day


Solution

  • That's the case for my beloved window aggregate functions!

    I think the following example could help you out (at least to get started).

    The following is the dataset I use. I translated your time (in longs) to numeric time to denote the day (and avoid messing around with timestamps in Spark SQL which could make the solution harder to comprehend...possibly).

    In the below visit dataset, time column represents the days between dates so 1s one by one represent consecutive days.

    scala> visits.show
    +---+----+
    | ID|time|
    +---+----+
    |  1|   1|
    |  1|   1|
    |  1|   2|
    |  1|   3|
    |  1|   3|
    |  1|   3|
    |  2|   1|
    |  3|   1|
    |  3|   2|
    |  3|   2|
    +---+----+
    

    Let's define the window specification to group id rows together.

    import org.apache.spark.sql.expressions.Window
    val idsSortedByTime = Window.
      partitionBy("id").
      orderBy("time")
    

    With that you rank the rows and count rows with the same rank.

    val answer = visits.
      select($"id", $"time", rank over idsSortedByTime as "rank").
      groupBy("id", "time", "rank").
      agg(count("*") as "count")
    scala> answer.show
    +---+----+----+-----+
    | id|time|rank|count|
    +---+----+----+-----+
    |  1|   1|   1|    2|
    |  1|   2|   3|    1|
    |  1|   3|   4|    3|
    |  3|   1|   1|    1|
    |  3|   2|   2|    2|
    |  2|   1|   1|    1|
    +---+----+----+-----+
    

    That appears (very close?) to a solution. You seem done!