rrevolution-r

Number rows per group with RevoScaleR


I'm converting a local R script to make use of the RevoScaleR functions in the Revolution-R (aka Microsoft R Client/Server) package. This to be able to scale better with large amounts of data.

The goal is to create a new column that numbers the rows per group. Using data.table this would be achieved using the following code:

library(data.table)
eventlog[,ActivityNumber := seq(from=1, to=.N, by=1), by=Case.ID]

For illustration purposes, the output is something like this:

    Case.ID    ActivityNumber
1       A              1
2       A              2
3       B              1
4       C              1
5       C              2
6       C              3

After some research to do this using the rx-functions I found the package dplyrXdf, which is basically a wrapper to use dplyrfunctions on Xdfstored data, while still benefitting from the optimized functions of RevoScaleR (see http://blog.revolutionanalytics.com/2015/10/using-the-dplyrxdf-package.html)

In my case, this would lead to the following:

result <- eventlog %>%
  group_by(Case.ID) %>%
  mutate(ActivityNumber = seq_len(n()))

However, this leads to the following error:

ERROR: Attempting to add a variable without a name to an analysis.
Caught exception in file: CxAnalysis.cpp, line: 3756. ThreadID: 1248 Rethrowing.
Caught exception in file: CxAnalysis.cpp, line: 5249. ThreadID: 1248 Rethrowing.
Error in doTryCatch(return(expr), name, parentenv, handler) : 
  Error in executing R code: ERROR: Attempting to add a variable without a name to an analysis.

Any ideas how to solve this error? Or other (better?) approaches to get the requested result?


Solution

  • Thanks to @Matt-parker for pointing me to this question.

    Note that n() is not a regular R function, although it looks like one. It needs to be implemented specially for each data source, and maybe also separately for each of mutate, summarise and filter.

    Right now, the only usage of n that is supported for xdf files is within summarise, to count the number of rows. Implementing it for the other verbs is actually nontrivial.

    In particular, there is a problem with Matt's use of seq_along to implement n's functionality. Remember that xdf files are block-structured: each chunk of rows is read in and processed independently of other chunks. This means that the sequence generated is for that chunk of rows only, and not for all the rows in a group. If a group spans more than one chunk, the sequence numbers will restart in the middle.

    The way to get correct sequence numbers is to keep a running count of how many rows you've read in for that group, and update it each time a chunk is processed. You can do this with a transformFunc, which you pass to transmute via the .rxArgs argument:

    ev <- eventlog %>% group_by(Case.ID) %>% transmute(.rxArgs = list(
        transformFunc = function(varList) {
            n <- .n + seq_along(varList[[1]])
            if(!.rxIsTestChunk)  # need this b/c rxDataStep does a test run on the 1st 10 rows
                .n <<- n[length(n)]
            list(n=n)
        },
        transformObjects = list(.n = 0))
    

    This should work with the local, localpar and foreach compute contexts. It may not work (or at least won't give a reproducible result) with any context where you can't guarantee that rxDataStep will process the rows in a deterministic order -- so Mapreduce, Spark, Teradata or similar.