rcbindduckdb

add new data by columns into duckdb out of memory


I have incoming data that I want to store on disk in a database or something. The data looks something like this

incoming_data <- function(ncol=5){
  dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
  random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
  colnames(dat) <- random_names
  dat
}


incoming_data()

This incoming_data is just for example.. In reality, one incoming_data set will have several 5k rows and about 50k columns. And the entire final file will be about 200-400 gigabytes

My question is how to add new data as columns to the database without loading the file into RAM

# your way
path <- "D:\\R_scripts\\new\\duckdb\\data\\DB.duckdb"
library(duckdb)
con <- dbConnect(duckdb(), dbdir = path, read_only = FALSE)
#  write one piece of data in DB
dbWriteTable(con, "my_dat", incoming_data())


#### how to make something like this ####
my_dat <- cbind("my_dat", incoming_data())

Solution

  • Assuming that the number of rows remains the same across incoming batches of data, you can use the positonal join(here) to achieve what you want:

    library(duckdb)
    library(DBI)
    library(purrr)
    
    incoming_data <- function(ncol=5){
      dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
      random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
      colnames(dat) <- random_names
      dat
    }
    
    # Generate batches of data of 
    data_to_join <- rep(list(incoming_data()), 5)
    
    # let's create some files with data
    tmp_dir <- tempdir()
    data_dir <- paste0(tmp_dir, "/data")
    dir.create(data_dir)
    
    walk2(
      data_to_join,
      seq_len(length(data_to_join)), 
      \(x, i) ({
        file_out <- paste0(data_dir, "/", i,".csv")
        write.csv(x, file_out, row.names = FALSE, quote = FALSE)
      })
    )
    
    csv_files <- list.files(data_dir, full.names = TRUE)
    
    con <- dbConnect(duckdb(), read_only = FALSE)
    
    # write first columns to duckdb instance
    duckdb_read_csv(con, "my_dat", csv_files[1])
    
    # Recursively add new columns by self joining with new columns from file.
    walk(csv_files[-1], 
         \(file) ({
           create_query <- sprintf(
             "CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join read_csv_auto('%s');", 
             file
           )
           dbSendQuery(con, create_query)
         })
    ) 
    
    dbReadTable(con, "my_dat")
    
    
    # Disconnect from connection
    dbDisconnect(con, shutdown = TRUE)
    
    

    For each new incoming batch of data you can run the create or replace statement from above to bind the new columns to the existing data;

    you can also adapt it to update the table with r objects:

    # Generate batches of data of 
    data_to_join <- rep(list(incoming_data()), 5)
    
    con <- dbConnect(duckdb(), read_only = FALSE)
    
    # write first iteration
    dbWriteTable(con, "my_dat", data_to_join[[1]])
    
    # Recursively add new columns by self joining with new columns from each available data 
    walk(
      data_to_join[-1], 
         \(x) ({
           dbWriteTable(con, "tmp_tbl", x, overwrite = TRUE, temporary = TRUE)
           dbSendQuery(
             con, 
             "CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join tmp_tbl;"
            )
           dbRemoveTable(con, "tmp_tbl")
         })
    )
    
    dbReadTable(con, "my_dat")
    # Disconnect from connection
    dbDisconnect(con, shutdown = TRUE)
    

    Regarding your question, on how to do this procedure without loading the file into memory: in my experience, loading directly the files into duckdb without loading them into R should be the best practice here, and will in principle avoid the problem.

    You might need to open and shutdown a connection per loaded file, to avoid crashing the R session, but that might have been a weird issue I had locally and might not translate into a problem here.

    I hope if finally helps :)