sqlapache-sparkpysparkapache-spark-sql

Consecutive User Details in Simple Approach


I have a table containing the fields: ip, datetime, year, month, day, country, region, seen_time. A single IP can have multiple records within the same hour. I need to identify IPs that were continuously active for at least 4 hours.

The desired output should include: ip, datetime, year, month, day,hour, country, region, seen_time, along with an additional flag is_active_min_4hr indicating whether the IP was active for a continuous period of 4 or more hours."

I could think of below approach. Step 1-4 to identify the ip having minimum 4 consecutive hours activity. However this approach involves multiple steps, is there a way we could do this in single query or simpler way?

  1. Select distinct/unique 'ip' and 'hour' from the table
  CREATE OR REPLACE TEMP VIEW step_1 AS
  SELECT distinct ip,hour
  FROM input_table
  1. Compute row number
CREATE OR REPLACE TEMP VIEW step_2 AS
SELECT ip,hour,
       ROW_NUMBER() OVER (PARTITION BY ip ORDER BY hour) AS rn
FROM step_1
  1. Compute difference: (hour - rn) AS grp
    CREATE OR REPLACE TEMP VIEW step_3 AS
    SELECT ip, (hour - rn) AS grp
    FROM step_2
CREATE OR REPLACE TEMP VIEW step_4 AS
SELECT ip, MIN(hour) AS start_hour, MAX(hour) AS end_hour
FROM step_3
GROUP BY ip, grp
HAVING COUNT(*) >= 4
  1. Join above dataset from step_4 with original input_table and pull remaining data

Input Data:

%sql
WITH input_table AS (
  SELECT * FROM VALUES
    ('192.168.1.1',  TIMESTAMP('2025-05-26 01:15:00'), 2025, 5, 26, 'US', 'California', 15, 1),
    ('192.168.1.1',  TIMESTAMP('2025-05-26 02:10:00'), 2025, 5, 26, 'US', 'California', 10, 2),
    ('192.168.1.1',  TIMESTAMP('2025-05-26 03:20:00'), 2025, 5, 26, 'US', 'California', 20, 3),
    ('192.168.1.1',  TIMESTAMP('2025-05-26 04:25:00'), 2025, 5, 26, 'US', 'California', 25, 4),
    ('192.168.1.1',  TIMESTAMP('2025-05-26 07:05:00'), 2025, 5, 26, 'US', 'California', 5, 7),
    ('10.0.0.2',     TIMESTAMP('2025-05-26 01:00:00'), 2025, 5, 26, 'US', 'Texas',      12, 1),
    ('10.0.0.2',     TIMESTAMP('2025-05-26 03:00:00'), 2025, 5, 26, 'US', 'Texas',      14, 3),
    ('10.0.0.2',     TIMESTAMP('2025-05-26 04:00:00'), 2025, 5, 26, 'US', 'Texas',      8, 4),
    ('172.16.0.5',   TIMESTAMP('2025-05-26 10:00:00'), 2025, 5, 26, 'UA', 'Kyiv',  6, 10),
    ('172.16.0.5',   TIMESTAMP('2025-05-26 11:00:00'), 2025, 5, 26, 'UA', 'Kyiv',  9, 11),
    ('172.16.0.5',   TIMESTAMP('2025-05-26 12:00:00'), 2025, 5, 26, 'UA', 'Kyiv',  15, 12),
    ('172.16.0.5',   TIMESTAMP('2025-05-26 13:00:00'), 2025, 5, 26, 'UA', 'Kyiv',  11, 13),
    ('172.16.0.5',   TIMESTAMP('2025-05-26 14:00:00'), 2025, 5, 26, 'UA', 'Kyiv',  10, 14)
  AS input_table(ip, datetime, year, month, day, country, region, seen_time, hour)
)
SELECT * FROM input_table;

Required output:

Same above data with additional field 'is_active_min_4hr', which is true for 4th , 12th and 13th record

ip year  month day country region    seen_time hour  is_active_min_4hr
192.168.1.1 2025  5     26     US  California  15        1    FALSE
192.168.1.1 2025  5     26     US  California  10        2    FALSE
192.168.1.1 2025  5     26     US  California  20        3    FALSE
192.168.1.1 2025  5     26     US  California  25        4    TRUE
192.168.1.1 2025  5     26     US  California  5         7    FALSE
10.0.0.2    2025  5     26     US  Texas       12        1    FALSE
10.0.0.2    2025  5     26     US  Texas       14        3    FALSE
10.0.0.2    2025  5     26     US  Texas       8         4    FALSE
172.16.0.5  2025  5     26     UA  Kyiv        6         10   FALSE
172.16.0.5  2025  5     26     UA  Kyiv        9         11   FALSE
172.16.0.5  2025  5     26     UA  Kyiv        15        12   FALSE
172.16.0.5  2025  5     26     UA  Kyiv        11        13   TRUE
172.16.0.5  2025  5     26     UA  Kyiv        10        14   TRUE

Solution

  • I don't have apache-spark-sql, but this (Postgresql) should do the job with minimal adjustments. Use Case expressions with analytic functions to mark the consequtive hours and to flag rows having them 4+

    WITH     S a m p l e    D a t a :
      input_table (ip, datetime, year, month, day, country, region, seen_time, hour) AS 
    ( SELECT '192.168.1.1',  To_Date('2025-05-26 01:15:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'California', 15, 1 Union All
      SELECT '192.168.1.1',  To_Date('2025-05-26 02:10:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'California', 10, 2 Union All
      SELECT '192.168.1.1',  To_Date('2025-05-26 03:20:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'California', 20, 3 Union All
      SELECT '192.168.1.1',  To_Date('2025-05-26 04:25:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'California', 25, 4 Union All
      SELECT '192.168.1.1',  To_Date('2025-05-26 07:05:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'California', 5, 7 Union All
      SELECT '10.0.0.2',     To_Date('2025-05-26 01:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'Texas',      12, 1 Union All
      SELECT '10.0.0.2',     To_Date('2025-05-26 03:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'Texas',      14, 3 Union All
      SELECT '10.0.0.2',     To_Date('2025-05-26 04:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'US', 'Texas',      8, 4 Union All
      SELECT '172.16.0.5',   To_Date('2025-05-26 10:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'UA', 'Kyiv',  6, 10 Union All
      SELECT '172.16.0.5',   To_Date('2025-05-26 11:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'UA', 'Kyiv',  9, 11 Union All
      SELECT '172.16.0.5',   To_Date('2025-05-26 12:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'UA', 'Kyiv',  15, 12 Union All
      SELECT '172.16.0.5',   To_Date('2025-05-26 13:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'UA', 'Kyiv',  11, 13 Union All
      SELECT '172.16.0.5',   To_Date('2025-05-26 14:00:00', 'yyyy-mm-dd hh24:mi:ss'), 2025, 5, 26, 'UA', 'Kyiv',  10, 14
    ), 
    

    ... create a CTE (grid) that flags rows having consequtive hours ...

      grid AS
    ( SELECT   ip, datetime, year, month, day, country, region, seen_time, hour,
             Coalesce( Case When LAG(hour) Over(Partition By ip, datetime Order By datetime, hour) = hour - 1
                               OR Min(hour) Over(Partition By ip, datetime) = hour 
                              Then 'Y' 
                       End, Cast(hour as Varchar(2))
                     ) as consequtive_hour
      FROM input_table
      ORDER BY ip, datetime, hour
    )
    
    --      M a i n    S Q L :
    Select ip, datetime, year, month, day, country, region, seen_time, hour,  
           Case When Count(consequtive_hour) 
                           Over(Partition By ip, datetime, consequtive_hour
                                Order By datetime, hour
                                Rows Between Unbounded Preceding And Current Row) >= 4
                    AND consequtive_hour = 'Y'
                Then 'TRUE'
           Else 'FALSE'
           End as is_active_min_4hr
    From grid 
    Order By ip, datetime, hour
    
    ip datetime year month day country region seen_time hour is_active_min_4hr
    10.0.0.2 2025-05-26 2025 5 26 US Texas 12 1 FALSE
    10.0.0.2 2025-05-26 2025 5 26 US Texas 14 3 FALSE
    10.0.0.2 2025-05-26 2025 5 26 US Texas 8 4 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 6 10 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 9 11 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 15 12 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 11 13 TRUE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 10 14 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 15 1 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 10 2 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 20 3 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 25 4 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 5 7 FALSE

    fiddle

    ADDITION 1 (comment's fiddle) - handling second connection within the same hour
    Change the grid's LAG() condition from " = hour - 1" to " IN(hour, hour - 1)".

      grid AS
    ( SELECT   ip, datetime, year, month, day, country, region, seen_time, hour,
             Coalesce( Case When LAG(hour) Over(Partition By ip, datetime Order By datetime, hour) IN(hour, hour - 1)
                               OR Min(hour) Over(Partition By ip, datetime) = hour 
                              Then 'Y' 
                       End, Cast(hour as Varchar(2))
                     ) as consequtive_hour
      FROM input_table
      ORDER BY ip, datetime, hour
    )
    

    ... which with the same Main SQL from above and with your comment fiddle's sample data would result as:

    ip datetime year month day country region seen_time hour is_active_min_4hr
    192.168.1.1 2025-05-26 2025 5 26 US California 15 1 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 10 2 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 20 3 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US Utah 25 4 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 25 4 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 5 7 FALSE

    fiddle

    ADDITION 2 - if you want to set TRUE to N-th non consequtive row after 4 consequtive once
    In this case grid should stay the same as in original answer but Main SQL should be:

    --      M a i n    S Q L :
    SELECT   ip, datetime, year, month, day, country, region, seen_time, hour,
             Case When  ( rnc >= 4 AND consequtive_hour = 'Y' )
                      OR 
                        ( rnc < 4 AND Max(rnc) Over(Partition By ip, datetime
                                                    Order By datetime, hour
                                                     Rows Between Unbounded Preceding And Current Row) >= 4 )
                Then 'TRUE'
            Else 'FALSE'
            End as is_active_min_4hr
    FROM (Select ip, datetime, year, month, day, country, region, seen_time, hour,  
                 consequtive_hour, 
                 Count(consequtive_hour) 
                           Over(Partition By ip, datetime, consequtive_hour
                                Order By datetime, hour
                                Rows Between Unbounded Preceding And Current Row) as rnc
          From grid 
          Order By ip, datetime, hour
        )
     ORDER BY ip, datetime, hour
    
    ip datetime year month day country region seen_time hour is_active_min_4hr
    10.0.0.2 2025-05-26 2025 5 26 US Texas 12 1 FALSE
    10.0.0.2 2025-05-26 2025 5 26 US Texas 14 3 FALSE
    10.0.0.2 2025-05-26 2025 5 26 US Texas 8 4 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 6 10 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 9 11 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 15 12 FALSE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 11 13 TRUE
    172.16.0.5 2025-05-26 2025 5 26 UA Kyiv 10 14 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 15 1 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 10 2 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 20 3 FALSE
    192.168.1.1 2025-05-26 2025 5 26 US California 25 4 TRUE
    192.168.1.1 2025-05-26 2025 5 26 US California 5 7 TRUE

    fiddle