dataframeparallel-processingjuliajulia-dataframe

How to declare a shared DataFrame in Julia for parallel computing


I have a large simulation on a DataFrame df which I am trying to parallelize and save the results of the simulations in a DataFrame called simulation_results.

The parallelization loop is working just fine. The problem is that if I were to store the results in an array I would declare it as a SharedArray before the loop. I don't know how to declare simulation_results as a "shared DataFrame" which is available everywhere to all processors and can be modified.

A code snippet is as follows:

addprocs(length(Sys.cpu_info()))

@everywhere begin
  using <required packages>

  df = CSV.read("/path/data.csv", DataFrame)

  simulation_results = similar(df, 0) #I need to declare this as shared and modifiable by all processors 
  
  nsims = 100000

end


@sync @distributed for sim in 1:nsims
    nsim_result = similar(df, 0)
    <the code which for one simulation stores the results in nsim_result >
    append!(simulation_results, nsim_result)
end

The problem is that since simulation_results is not declared to be shared and modifiable by processors, after the loop runs, it produces basically an empty DataFrame as was coded in @everywhere simulation_results = similar(df, 0).

Would really appreciate any help on this! Thanks!


Solution

  • The pattern for distributed computing in Julia is much simpler than what you are trying to do.

    Your code should look more or less like this:

    df = CSV.read("/path/data.csv", DataFrame)
    
    @everywhere using <required packages>
    
    
    simulation_results = @distributed (append!) for sim in 1:nsims
        <the code which for one simulation stores the results in nsim_result >
        nsim_result
    end
    

    Note you do not need to load df at every process within the Julia cluster since @distributed will make sure it is readable. You do not need to @sync neither because in my code you would use the aggregator function (append!).

    A minimal working example (run with addprocs(4)):

    @everywhere using Distributed, DataFrames
    df = DataFrame(a=1:5,b=rand())
    

    and now the result:

    julia> @distributed (append!) for i in 2:5
               DataFrame(bsum=sum(df.b[1:myid()]),c=myid())
           end
    4×2 DataFrame
     Row │ bsum      c
         │ Float64   Int64
    ─────┼─────────────────
       1 │ 0.518127      2
       2 │ 0.777191      3
       3 │ 1.03625       4
       4 │ 1.29532       5