concurrencyclojureatomic

Clojure function that waits on the completion of another function before executing


I have a requirement for a function that when called with particular input args executes a supplied function g, but only after another supplied function f has finished executing with the same input args. There is also a requirement that when the function is called multiple times with the same input args, f is only executed once on the first call, and the other calls wait for this to complete, then execute g directly.

Edit: The solution should work when run in parallel on different threads, and should also use threads efficiently. E.g. blocking should be on a per input basis rather than the whole function.

My first attempt at the function is as follows:

(defn dependent-func
  ([f g]
   (let [mem (atom {})]
     (fn [& args]
       (->> (get (locking mem
                   (swap! mem (fn [latch-map args]
                                (if (contains? latch-map args)
                                  latch-map
                                  (let [new-latch (CountDownLatch. 1)
                                        new-latch-map (assoc latch-map args new-latch)]
                                    (->> (Thread. #(do (apply f args)
                                                       (.countDown new-latch)))
                                         (.start))
                                new-latch-map))) args)) args)
            (.await))
       (apply g args)))))

This appears to meet my requirements, and awaits on f are on a per input basis, so I'm relatively happy with that. Initially I had hoped to just use swap! to do the mem updating but unfortunately swap! explicitly states that the function in the swap! could be called multiple times (I have seen this in testing). As a result of this I ended up having to lock on mem when updating which is really ugly.

I am sure there must be a cleaner way of doing this that leverages Closure's concurrency mechanisms better than I have, but so far I've been unable to find it.

Any advice would be greatly appreciated.

Thanks,

Matt.


Solution

  • Clojure's combination of future, promise, and deliver is well suited to starting a process and have several threads wait for it to finish.

    I'll also split the waiting part into it's own function to make the code easier to follow, and so I can use the built in memoize function:

    This question is a very good example of when to use promise and deliver rather than simply a future.

    Because we are going to use memoize where it's not safe to run the function twice, we need to be careful that the two calls don't enter memoize at exactly the same time. so we are going to lock only the moment we enter memoize, not the duration of the memoized function.

    hello.core> (def lock [])
    #'hello.core/lock
    

    this function will always return the same future Object for every time f is called with a given set of arguments, except we need to make memoize safe by wrapping this in a function that does the locking (you could also use an agent for this)

    hello.core> (def wait-for-function-helper             
                  (memoize (fn [f args]
                             (let [answer (promise)]
                               (println "waiting for function " f " with args" args)
                               (future (deliver answer (apply f args)))
                               answer))))
    
    #'hello.core/wait-for-function-helper
    hello.core> (defn wait-for-function [& args]
                  (locking lock
                    (apply wait-for-function-helper args)))
    #'hello.core/wait-for-function
    

    and now we write the actual dependent-func function that uses the safely memoized, future producing, wait-for-function function.

    hello.core> (defn dependent-func [f g & args]
                  @(wait-for-function f args)
                  (apply g args))
    #'hello.core/dependent-func
    

    and define a slow opperation to see it in action:

    hello.core> (defn slow-f-1 [x]
                  (println "starting slow-f-1")
                  (Thread/sleep 10000)
                  (println "finishing slow-f-1")
                  (dec x))
    #'hello.core/slow-f-1
    

    and to test it we want to start two of the same function at exactly the same time.

    hello.core> (do (future
                      (println "first" (dependent-func slow-f-1 inc 4)))
                    (future
                      (println "second" (dependent-func slow-f-1 inc 4))))
    
    waiting for function  
    #object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
    #object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
    starting slow-f-1
    finishing slow-f-1
    second
    first
    5
    5
    

    and if we call it again we see that slow-f-1 only ever ran once:

    hello.core> (do (future
                      (println "first" (dependent-func slow-f-1 inc 4)))
                    (future
                      (println "second" (dependent-func slow-f-1 inc 4))))
    
    #object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
    first 5
    second 5