I'm using a targets
workflow pipeline. Part of this pipeline is to monitor a directory of csv files for updates. There are more than 10,000 csv files in this directory, and new files are added weekly. I want to be able to identify the newly added files and append them to an existing set of *.rds
files. The easy thing would be to re-run the process that creates the 5 subsets of *.rds
files each week, but that takes time. The efficient thing would be to identify the newly added files, and simply bind_rows
with the proper rds
file.
I can do this easily enough with typical programming using dir()
and setdiff()
, where I store a snapshot of csv filepaths from the previous day. But I'm struggling to accomplish this within the targets
framework.
Here is an attempt that doesn't seem to work. I think I want to monitor the temporary results in the /_targets
directory, but I'm not sure how to go about doing that. And, the targets
documentation recommended not using tar_load
inside the target configuration itself.
tar_script({
list(
tar_target(csv_directory, "/csv/"),
tar_target(csv_snapshot, dir(csv_directory)),
tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
...}
})
A few components that may help:
tar_target(format = "file")
, the package watches input and/or output files for changes and reruns the affected targets if there are any.tar_target(format = "feather")
so targets
automatically compresses your output data and ensures you do not have to worry about micromanaging files.targets
package can slow down with that many because there is an overhead cost for each target.So I recommend that you organize your CSV files into batches (say, by week) and dynamically branch over batches to process them. Another batch structure may be more appropriate, depending on the specifics of your use case.
csv/
├── week1/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
├── week2/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
...
Sketch of the pipeline:
# _targets.R
process_csv_dir <- function(csv_dir) {...} # custom user-defined function
list(
tar_target(csv_dir, list.files("csv", full.names = TRUE)),
tar_target(
processed_data,
process_csv_dir(csv_dir),
pattern = map(csv_dir), # dynamic branching
format = "feather" # from the arrow package
)
)