sqlrtime-seriesrle

Run Length Encoding within SQL Query


I have time-series data that I am summarizing by using run-length encoding with some additional summary statistics. The problem is that the data is a minimum of 40 million rows and I only have 16GB of RAM. At the moment I am having to perform the same thing on batches of the data and then appending the results together. The entire process is currently taking over a day. I know for loops are terrible but trying my current query all at once crashes my RStudio...

I am hoping someone can help to write my for loop into a function and then run it all using the parallel package in R. Or just optimize my original query???

The for loop is querying a subset of customers at a time so I'll do my best to create a reproducible example.

library(DBI)
library(dbplyr)
library(dplyr)
library(data.table)

customers <- data.frame(
  customer.number = c(12345, 23456, 34567, 45678, 56789)
)

n <- 2
nr <- nrow(customers)
X <- split(customers, rep(1:ceiling(nr/n), each=n, length.out=nr))

consumption <- data.frame(
  customer.number = c(12345, 12345, 12345,
                      23456, 23456, 23456, 
                      34567, 34567, 34567, 
                      45678, 45678, 45678, 
                      56789, 56789, 56789),
  consumption = c(1,2,3,
                  0,0,1,
                  1,0,1,
                  2,2,0,
                  0,0,0),
  datetime = c("2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00")
)

beginning <- ymd_hms("2022-01-01 00:00:00")
ending <- ymd_hms("2022-02-01 00:00:00")

for(i in 1:length(X)){
  
  rle <- tbl(connection, "consumption") %>%
    select(customer.number, consumption, datetime) %>%
    mutate(flag = if_else(consumption >= 1, TRUE, FALSE)) %>%
    filter(customer.number %in% !!X[[i]]$customer.number,
           datetime >= !!beginning, 
           datetime < !!ending) %>%
    collect() %>%
    arrange(customer.number, datetime) %>%
    group_by(customer.number, Run = data.table::rleid(customer.number, flag), flag) %>%
    summarize(Start = min(datetime), 
              End = max(datetime), 
              Length = length(Run),
              Min.Consumption = min(consumption),
              Avg.Consumption = mean(consumption),
              Max.Consumption = max(consumption)) %>%
    filter(flag != FALSE)
  
  if(!exists("results")) {
    results <- rle
  } else if (exists("results")) {
    results <- rbind(results, rle)
  }
  
  if(names(X)[i] == "1"){
    results <- rle
  } else {
    results <- readRDS("results.rds") %>%
      rbind(rle)
  }
  
  saveRDS(results, file = "results.rds")
  
  remove(results, rle)
  
  print(names(X)[i])
  
}


Solution

  • Here is a way to do the entire thing on the database. Note that there is no need for a loop, and the collect() statement is at the end

    tbl(connection, "consumption" ) %>%
      mutate(flag = if_else(consumption>0,1,0)) %>%
      filter(datetime >= !!beginning, datetime < !!ending) %>%
      group_by(flag) %>%
      window_order(customer_number, datetime) %>% 
      mutate(num2 = row_number()) %>% 
      ungroup() %>% 
      mutate(Run = row_number()-num2) %>% 
      select(-num2) %>% 
      group_by(customer_number, Run, flag) %>%
      summarize(Start = min(datetime), 
                End = max(datetime),
                Length = count(Run),
                Min.Consumption = min(consumption),
                Avg.Consumption = mean(consumption),
                Max.Consumption = max(consumption), .groups="drop") %>% 
      filter(flag==1) %>% 
      collect()
    

    Output:

      customer_number     Run  flag Start               End                 Length Min.Consumption Avg.Consumption Max.Consumption
      <chr>           <int64> <dbl> <dttm>              <dttm>               <int>           <int>           <int>           <int>
    1 12345                 0     1 2022-01-01 00:00:00 2022-01-01 02:00:00      3               1               2               3
    2 23456                 2     1 2022-01-01 02:00:00 2022-01-01 02:00:00      1               1               1               1
    3 34567                 2     1 2022-01-01 00:00:00 2022-01-01 00:00:00      1               1               1               1
    4 34567                 3     1 2022-01-01 02:00:00 2022-01-01 02:00:00      1               1               1               1
    5 45678                 3     1 2022-01-01 00:00:00 2022-01-01 01:00:00      2               2               2               2