javamultithreadingconcurrencyexecutorservicesynchronized

Multi-threaded Java application is behaving unexpectedly. Can someone point out the problem?


I have a simple class which does not behave as expected. I'd like to point out that this class is the whole application. I'll show the code, the output, and then ask my question.

import java.awt.Image;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import ronix.metadata.readers.MediaMetadataReader;

public class TestExecutor implements Runnable {
    private ExecutorService executor;
    private static final int NUM_THREADS_OF_EXECUTOR = 8;
    private Map<Thread,Integer> thread2loadingId;
    private final Object lock = new Object(); //used for synchronization
    private boolean reqToFinish;
    private Integer loadingId = -1;
    private List<Integer> loadRequestIds = new ArrayList<Integer>();
    private Map<Integer, String> reqId2File = new HashMap<Integer, String>();
    private Map<Integer, Image> reqId2Image = new HashMap<Integer, Image>();
    
    public TestExecutor() {
        thread2loadingId = new ConcurrentHashMap<>();
        executor = Executors.newFixedThreadPool(NUM_THREADS_OF_EXECUTOR);
    }
    public static void main(String[] args) {
        TestExecutor test = new TestExecutor();
        test.init();
        new Thread(test).start();
    }
    private void init() {
        loadRequestIds = IntStream.rangeClosed(0, 25).boxed().collect(Collectors.toList());
        loadRequestIds.stream().forEach(x -> reqId2File.put(x,"V"+x));
    }
    private String obtainWait4nextReq() {
        boolean requestObtained = false;
        String imageFile = null;
        while(!requestObtained) {
            synchronized(lock) {
                if (!loadRequestIds.isEmpty()) {
                    loadingId = loadRequestIds.remove(0);
                    imageFile = reqId2File.remove(loadingId);
                    requestObtained = true;
                }
            }
            if (!requestObtained)
                waitFor(1000);
        }
        return imageFile;
    }
    
    @Override
    public void run() {
        do { //until requested to stop
            String imageFileName = obtainWait4nextReq();
            System.out.println("        run assigns " + imageFileName + " - " + loadingId);
                executor.execute(() -> { processLoadRequest(imageFileName, loadingId); });

        } while(!reqToFinish);
    }
    
    //this is for the executor threads
    private void processLoadRequest(String imageFileName, Integer localLoadingId) {
        Thread currentThread = Thread.currentThread();
        System.out.println("Thread " + currentThread.getId() + " is assigned loading image " + imageFileName + " - " + localLoadingId);
        thread2loadingId.put(currentThread, localLoadingId);        
        processLoadRequest(imageFileName);
        thread2loadingId.remove(currentThread);
        System.out.println("Thread " + currentThread.getId() + " finished loading image " + imageFileName + " - " + localLoadingId);
    }
    private void processLoadRequest(String imageFileName) {
        Integer localLoadingId; 
        localLoadingId = thread2loadingId.get(Thread.currentThread());
        System.out.println("beginning to load/read loadingId= "+imageFileName + " - " + localLoadingId);            
                    //video files may also be requested to cache or load
                    Image image = null;
                    boolean loadFailed = false;
                    if (!MediaMetadataReader.isVideoFile(imageFileName)) {
                        try {                   
                            image = loadAndResizeImage(imageFileName); //loadImage(imageFile);
                        } catch (IOException e) {
                            e.printStackTrace();
                            System.out.println("Failed to load image file " + imageFileName+ " - " + localLoadingId);
                            loadFailed = true;
                        }
                    }
                    synchronized(lock) {
                        if (!loadFailed) {
        System.out.println("adding to Cache - " + localLoadingId);              
                            reqId2Image.put(localLoadingId, image); //for video image==null
                        } 
                    }
    }
    private Image loadAndResizeImage(String imageFileName) throws IOException {
        try {
            Thread.sleep((long) Math.random() * 1000 + 1000); //sleep 1-2 secs
        } catch (InterruptedException e) {
            throw new IOException();
        }
        return null;
    }
    private void waitFor(int milis) {
        try {
            Thread.sleep(milis);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

The output produced:

        run assigns V0 - 0
        run assigns V1 - 1
        run assigns V2 - 2
Thread 13 is assigned loading image V0 - 2
Thread 14 is assigned loading image V1 - 2
beginning to load/read loadingId= V1 - 2
        run assigns V3 - 3
beginning to load/read loadingId= V0 - 2
Thread 15 is assigned loading image V2 - 3
        run assigns V4 - 4
beginning to load/read loadingId= V2 - 3
Thread 16 is assigned loading image V3 - 4
beginning to load/read loadingId= V3 - 4
        run assigns V5 - 5
Thread 17 is assigned loading image V4 - 5
beginning to load/read loadingId= V4 - 5
        run assigns V6 - 6
Thread 18 is assigned loading image V5 - 6
        run assigns V7 - 7
beginning to load/read loadingId= V5 - 6
Thread 19 is assigned loading image V6 - 7
beginning to load/read loadingId= V6 - 7
        run assigns V8 - 8
Thread 20 is assigned loading image V7 - 8
beginning to load/read loadingId= V7 - 8
        run assigns V9 - 9
        run assigns V10 - 10
        run assigns V11 - 11
        run assigns V12 - 12
        run assigns V13 - 13
        run assigns V14 - 14
        run assigns V15 - 15
        run assigns V16 - 16
        run assigns V17 - 17
        run assigns V18 - 18
        run assigns V19 - 19
        run assigns V20 - 20
        run assigns V21 - 21
        run assigns V22 - 22
        run assigns V23 - 23
        run assigns V24 - 24
        run assigns V25 - 25
adding to Cache - 2
adding to Cache - 5
Thread 14 finished loading image V1 - 2
adding to Cache - 6
Thread 17 finished loading image V4 - 5
adding to Cache - 2
Thread 14 is assigned loading image V8 - 25
Thread 18 finished loading image V5 - 6
beginning to load/read loadingId= V8 - 25
adding to Cache - 8
Thread 13 finished loading image V0 - 2
Thread 13 is assigned loading image V11 - 25
beginning to load/read loadingId= V11 - 25
Thread 17 is assigned loading image V9 - 25
Thread 20 finished loading image V7 - 8
Thread 20 is assigned loading image V12 - 25
beginning to load/read loadingId= V12 - 25
adding to Cache - 3
Thread 15 finished loading image V2 - 3
Thread 15 is assigned loading image V13 - 25
beginning to load/read loadingId= V13 - 25
Thread 18 is assigned loading image V10 - 25
adding to Cache - 4
beginning to load/read loadingId= V9 - 25
adding to Cache - 7
Thread 19 finished loading image V6 - 7
Thread 16 finished loading image V3 - 4
beginning to load/read loadingId= V10 - 25
Thread 16 is assigned loading image V15 - 25
beginning to load/read loadingId= V15 - 25
Thread 19 is assigned loading image V14 - 25
beginning to load/read loadingId= V14 - 25
adding to Cache - 25
adding to Cache - 25
Thread 20 finished loading image V12 - 25
Thread 14 finished loading image V8 - 25
adding to Cache - 25
Thread 14 is assigned loading image V17 - 25
Thread 20 is assigned loading image V16 - 25
beginning to load/read loadingId= V17 - 25
Thread 15 finished loading image V13 - 25
adding to Cache - 25
Thread 15 is assigned loading image V18 - 25
beginning to load/read loadingId= V16 - 25
beginning to load/read loadingId= V18 - 25
adding to Cache - 25
Thread 19 finished loading image V14 - 25
adding to Cache - 25
Thread 18 finished loading image V10 - 25
adding to Cache - 25
Thread 17 finished loading image V9 - 25
Thread 19 is assigned loading image V19 - 25
Thread 17 is assigned loading image V21 - 25
adding to Cache - 25
Thread 13 finished loading image V11 - 25
Thread 18 is assigned loading image V20 - 25
beginning to load/read loadingId= V20 - 25
Thread 13 is assigned loading image V22 - 25
beginning to load/read loadingId= V22 - 25
Thread 16 finished loading image V15 - 25
beginning to load/read loadingId= V21 - 25
beginning to load/read loadingId= V19 - 25
Thread 16 is assigned loading image V23 - 25
beginning to load/read loadingId= V23 - 25
adding to Cache - 25
Thread 14 finished loading image V17 - 25
Thread 14 is assigned loading image V24 - 25
beginning to load/read loadingId= V24 - 25
adding to Cache - 25
adding to Cache - 25
Thread 15 finished loading image V18 - 25
Thread 20 finished loading image V16 - 25
Thread 15 is assigned loading image V25 - 25
beginning to load/read loadingId= V25 - 25
adding to Cache - 25
adding to Cache - 25
Thread 17 finished loading image V21 - 25
adding to Cache - 25
Thread 18 finished loading image V20 - 25
adding to Cache - 25
Thread 13 finished loading image V22 - 25
Thread 19 finished loading image V19 - 25
adding to Cache - 25
Thread 16 finished loading image V23 - 25
adding to Cache - 25
Thread 14 finished loading image V24 - 25
adding to Cache - 25
Thread 15 finished loading image V25 - 25

There is the main thread which executes the run() method and only the run method calls obtainWait4nextReq. it obtains a (id,fileName) pair which in this case is always (n,"Vn") where 0<=n<=25. it hands the pair to the executor. there are log messages that document what happens. so the first question is why are the pairs not matched up? for example we can see that the first executor thread logs Thread 13 is assigned loading image V0 - 2 which is not a valid pair. it should have probably been "V0 - 0". so how do the pairs get changed? Please, before explaining a solution please explain what is the problem with this code.


Solution

  • You’re telling the executor to execute a function. Inside that function you read the value from loadingId. Before this happens the main thread is doing whatever it wants and changing that variable. So you’ll get whatever is the current value at that point.

    If you need to keep the value what it was at the moment you call execute then you need to store it at that point in another variable and use that inside the call. That one cannot change its value.