rpurrrpmapfurrr

furrr / purrr progressr progress bar not at all synched with the progress of the computation


I want to create a function that takes a function and applies it once for every row in a tibble with arguments stored in the correspondingly named columns of the tibble I realize that this sounds a bit odd, but I want the user facing function / functionality be simple.

The processing will take a lot of time in most cases, so I would really prefer to have progress bar functionality, and this is where I found great trouble:

This code works (with no progress bar then):

library(tibble)
library(dplyr)
library(purrr)
library(furrr)
library(tidyr)
library(wrassp)
library(progressr)

xf <- function(x,trim,na.rm,ds="ded"){
  return(x*trim*na.rm)
}

xf2 <- function(x,trim,na.rm,ds="ded"){
  return(list("a"=x,"b"=trim))
}

xf3 <- function(x,trim,na.rm,ds="ded"){
  return(data.frame("a"=x,"b"=trim))
}

mymap <- function(f,...){
  plan(multisession)
  exDF <- tribble(
    ~x, ~trim, ~na.rm, ~notarg, ~listOfFiles, ~toFile,
    0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE, 
    0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE
  )

  dotArgs <- list(...)
  dotArgsRT <- as_tibble_row(dotArgs)

  dotArgsNames <- names(dotArgs)
  
  allArgsNames <- formalArgs(f)
  
  exDF %>% 
    select(-any_of(!!dotArgsNames)) %>%
    bind_cols(dotArgsRT) %>% 
    select(any_of(allArgsNames)) %>%
    rowwise() %>% 
    mutate(temp = list(future_pmap(.,.f=f,.progress=FALSE))) %>% 
    tibble::rownames_to_column(var = "sl_rowIdx") %>% 
    mutate(out = list(map(temp,as_tibble))) %>%
    select(-temp) %>% 
    unnest(out) %>%
    unnest(out)
}

mymap(xf,c=20,a=20,ds=1) 
mymap(xf2,c=20,a=20,ds=1)
mymap(xf3,c=20,a=20,ds=1)

This code kind of works (sorry for the extended example, but I want to force a progress bar to be presented):

library(tibble)
library(dplyr)
library(purrr)
library(furrr)
library(tidyr)
library(wrassp)
library(progressr)

xf <- function(x,trim,na.rm,ds="ded"){
  return(x*trim*na.rm)
}

mymap <- function(f,...){
  plan(multicore)
  exDF <- tribble(
    ~x, ~trim, ~na.rm, ~notarg, ~listOfFiles, ~toFile,
    0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE, 
    0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE
  )
  
  exDF <- exDF %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) 
  exDF <- exDF   %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF) %>% 
    bind_rows(exDF)
  
  dotArgs <- list(...)
  dotArgsRT <- as_tibble_row(dotArgs)
  
  dotArgsNames <- names(dotArgs)
  
  allArgsNames <- formalArgs(f)

  p <- progressr::progressor(steps = nrow(exDF))

  pWrap <- function(fun=f,...){
    
    iDA <- list(...)
    p(message="processing")
    #Sys.sleep(0.1)
    do.call(fun,iDA)
    
  }

 out <- exDF %>% 
    select(-any_of(!!dotArgsNames)) %>%
    bind_cols(dotArgsRT) %>% 
    select(any_of(allArgsNames)) %>%
    rowwise() %>% 
    mutate(temp = list(future_pmap(.,.f=pWrap))) %>% 
    tibble::rownames_to_column(var = "sl_rowIdx") %>% 
    mutate(out = list(map(temp,as_tibble))) %>%
    select(-temp) %>% 
    unnest(out) %>%
    unnest(out)

    return(out)


}

mymap(xf,c=20,a=20,ds=1)

But the progress bar is not displayed if I call the function like that, but only if I call it this way:

with_progress(mymap(xf,c=20,a=20,ds=1))

And, the progress bar appears very quickly, disappears and then the function processes the data for a time and then returns the data.

So the progress bar is not really informative of the overall progression of the function to the user.

I guess it has to do with the dplyr calls being evaluated at the point where a return value is expected ?

But, how do I then force the progress bar to be in sync with that process?

I have tried using just pmap rather than future_pmap to solve the potential issue of the value not being resolved yet, but it seems to not be the issue.

I appreciate all the help I can get on this.


Solution

  • library(tidyverse)
    library(furrr)
    library(progressr)
    
    xf <- function(x,trim,na.rm,ds="ded"){
      Sys.sleep(sample.int(20,1)/10)
      return(x*trim*na.rm)
    }
    
    mymap <- function(f,...){
      handlers(global = TRUE)
      
      plan(multisession)
      exDF <- tribble(
        ~x, ~trim, ~na.rm, ~notarg, ~listOfFiles, ~toFile,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE, 
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE,
        0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
        0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE
      )
      
      dotArgs <- list(...)
      dotArgsRT <- as_tibble_row(dotArgs)
      
      dotArgsNames <- names(dotArgs)
      
      allArgsNames <- formalArgs(f)
      
       
      setupdf <-   exDF %>% 
        select(-any_of(!!dotArgsNames)) %>%
        bind_cols(dotArgsRT) %>% 
        select(any_of(c(allArgsNames,"id"))) %>%
        rowwise() 
      
      num_to_do <- nrow(setupdf)
      cat("\nWill be doing ",num_to_do,"\n")
      p <- progressor(num_to_do)
    
      
      f2 <- function(...){
        result <- f(...)
        p()
        result
      }
        part1 <- setupdf%>% 
        mutate(temp = list(pmap(cur_data(),.f=f2))) 
      
        print("calculationa are complete ")
        
        part1 %>%
        tibble::rownames_to_column(var = "sl_rowIdx") %>% 
        mutate(out = list(map(temp,as_tibble))) %>%
        select(-temp) %>% 
        unnest(out) %>%
        unnest(out)
    }
    
    
    
    mymap(xf,c=20,a=20,ds=1)