javaconcurrencycountdownlatch

Synchronise concurrent requests to share results of a slow operation


I have a Java UI service that has an API method that invokes an operation that's relatively slow (say ~30secs). The operation is parameterless, but it operates on external data that does change (relatively slowly) over time. It's not critical for the method to return the most up-to-date results - if they're 30secs old that's acceptable.

Ultimately I need to optimise the implementation of the slow operation, but as a short-term fix, I'd like to make the operation mutually exclusive, such that if a second incoming request (on a separate thread) attempts to invoke the operation while another is already in progress, then the second one blocks until the first one completes. The second thread then uses the results of the first invocation of the operation - i.e. it doesn't attempt to run the operation again.

E.g.:

class MyService {
    String serviceApiMmethod() {
       // If a second thread attempts to call this method while another is in progress
       // then block here until the first returns and then use those results
       // (allowing it to return immediately without a second call to callSlowOperation).
       return callSlowOperation();
    }
}

What's the preferred general approach for this in Java (8). I'm guessing I could use a CountDownLatch, but it's not clear how best to share the result across the threads. Is there an existing concurrency primitive that facilitates this?

EDIT: I need to clear any references to the result once all threads have consumed it (i.e. returned it to the caller), as it's relatively large object, which needs to be GC'ed as soon as possible.


Solution

  • Simple idea

    Version 1:

    class Foo {
        public String foo() throws Exception {
            synchronized (this) {
                if (counter.incrementAndGet() == 1) {
                    future = CompletableFuture.supplyAsync(() -> {
                        try {
                            Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
                        } catch (InterruptedException e) {
                        }
                        return "ok" + ThreadLocalRandom.current().nextInt();
                    });
                }
            }
    
            String result = future.get();
            if (counter.decrementAndGet() == 0) {
                future = null;
            }
    
            return result;
        }
    
        private AtomicInteger counter = new AtomicInteger();
        private Future<String> future;
    }
    

    Version 2: together with @AleksandrSemyannikov

    public class MyService {
        private AtomicInteger counter = new AtomicInteger();
        private volatile String result;
    
        public String serviceApiMethod() {
            counter.incrementAndGet();
            try {
                synchronized (this) {
                    if (result == null) {
                        result = callSlowOperation();
                    }
                }
                return result;
            } finally {
                if (counter.decrementAndGet() == 0) {
                    synchronized (this) {
                        if (counter.get() == 0) {
                            result = null;
                        }
                    }
                }
            }
        }
    
        private String callSlowOperation() {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return Thread.currentThread().getName();
        }
    }