multiprocessingjuliadistributed-computing

Interrupting busy worker process


I am trying to benchmark an algorithm I developed. To this end, I run the algorithm on several instances and measure time, memory, other numbers... For each instance, I create a new process; partly because I don't wait the entire benchmark to die if one instance was too big. The host process (and thus all workers) os run with a some ulimit on virtual memory. However, I also want to limit the time for an instance, and just kill it when it's done. I don't want to do this with a prlimit on the worker process, because I don't want to include some expensive setup in this limit. I came up with the following mockup:

using Distributed
addprocs(1)
@everywhere function f()
  sleep(10)
end
pid = workers()[1]
t = remotecall(f, pid)
if timedwait(()->isready(t), 2) == :timed_out
  println("Interrupted worker")
  rmprocs(pid)
else
  print("Worker gave ", fetch(t))
end

which kills the worker after two seconds as expected.

However, if I replace f by

@everywhere function f()
  rand(10000, 10000)^2
end

Then it does not work anymore; i.e., the worker just terminates when its expensive work is done. This made me wonder if the benchmarked task in the worker process has to yield control intermittently for the interface with the main process to do stuff. This seems kind of useless for my application.

Am I seeing this right? If yes, what can I do instead; if not, why doesn't the second case work as expected?

P.S.: If relevant, running workers with multiple threads seems not to be an option, because I use a library that is known not to work properly in a multithreaded julia process.


Solution

  • A problem in the given code is that isready blocks waiting for a reply from a remote Future. This problem and a solution are described in the documentation for isready(::Future):

    If the argument Future is owned by a different node, this call will block to wait for the answer. It is recommended to wait for rr in a separate task instead or to use a local Channel as a proxy:

    p = 1
    f = Future(p)
    errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
    isready(f)  # will not block
    

    So using a local Future and an async task to set its value:

    using Distributed
    
    addprocs(1)
    @everywhere function fun()
      rand(10000, 10000)^2
    end
    
    pid = workers()[1]
    fut = Future()
    @async put!(fut, remotecall_fetch(fun, pid))
    if timedwait(()->isready(fut), 2) == :timed_out
      println("Interrupting worker")
      rmprocs(pid)
    else
      print("Worker gave ", fetch(fut))
      rmprocs(pid)
    end
    

    Here the async is only in the main process. So the main process will use multiple threads, but not the worker process.