javamultithreadingconcurrencyatomicatomicinteger

Java AtomicInteger not equal to a million after a million iterations (minimal example included)


I am writing a program which computes the squares of integers between 0 and 1 million(not inclusive) in an array. I am using up to 8 threads(inclusive).
For indexing the array i am using an atomic integer.
I expect the for loop body in the run method to be executed a million times regardless of number of threads.
To count how many times it is executed I am using another atomic integer.

   static AtomicInteger cnt = new AtomicInteger(0);
    static AtomicInteger ii = new AtomicInteger(0);
    static long[] arr = new long[1_000_000];

    public static class Worker extends Thread {

        public static void main(String[] args) throws IOException, InterruptedException {
            int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
            for (int n = 1; n <= maxThreads; n++) {
                int numThreads = n;
                Thread[] threads = new Thread[numThreads];
                ii = new AtomicInteger(0);
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Worker();
                    threads[i].start();
                }
                for (Thread t : threads) {
                    t.join();
                }
                System.out.printf("%d threads, cnt is %d\n" , threads.length, cnt.get());
                cnt.set(0);
            }
        }

        @Override
        public void run() {
            for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {
                arr[i] = (long)i*i;
                cnt.getAndIncrement();
            }
        }
    }

Expected result of execution is:

1 threads, cnt is 1000000
2 threads, cnt is 1000000
3 threads, cnt is 1000000
4 threads, cnt is 1000000
5 threads, cnt is 1000000
6 threads, cnt is 1000000
7 threads, cnt is 1000000
8 threads, cnt is 1000000

However upong running i get following:

1 threads, cnt is 1000001
2 threads, cnt is 1000002
3 threads, cnt is 1000003
4 threads, cnt is 1000002
5 threads, cnt is 1000003
6 threads, cnt is 1000002
7 threads, cnt is 1000002
8 threads, cnt is 1000005

Can you help me debug?


Solution

  • There are some minor issues like the ii = new AtomicInteger(0) assignment between the runs or the declaration to throw IOException that could never occur. While the assignment to ii has no impact here, as it happens at a point where no threads are accessing ii, it can be distracting, as it diverges from established code pattern for multi-threaded code. You should reset ii the same way you reset cnt between the runs.

    The actual issue is the loop’s starting point:

    for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {
    

    You are reading using get without incrementing the value, so multiple threads may read the same value, causing a data race at the subsequent arr[i] = (long)i*i; (which stays unnoticed here, but of course, should be avoided) and performing more iterations than necessary, as you noticed due to the cnt update. You should use getAndIncrement() for the initial index like you do for the subsequent iterations, to ensure that each thread accesses a different array index.

    static final AtomicInteger cnt = new AtomicInteger(0);
    static final AtomicInteger ii = new AtomicInteger(0);
    static final long[] arr = new long[1_000_000];
    
    public static class Worker extends Thread {
    
        public static void main(String[] args) throws InterruptedException {
            int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
    
            for (int numThreads = 1; numThreads <= maxThreads; numThreads++) {
                Thread[] threads = new Thread[numThreads];
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Worker();
                    threads[i].start();
                }
                for (Thread t : threads) {
                    t.join();
                }
                System.out.printf("Used %d threads, cnt is %d\n" , numThreads, cnt.get());
                cnt.set(0);
                ii.set(0);
            }
        }
    
        @Override
        public void run() {
            for(int i = ii.getAndIncrement(); i < arr.length; i = ii.getAndIncrement()) {
                arr[i] = (long)i*i;
                cnt.getAndIncrement();
            }
        }
    }
    
    Used 1 threads, cnt is 1000000
    Used 2 threads, cnt is 1000000
    Used 3 threads, cnt is 1000000
    Used 4 threads, cnt is 1000000
    Used 5 threads, cnt is 1000000
    Used 6 threads, cnt is 1000000
    Used 7 threads, cnt is 1000000
    Used 8 threads, cnt is 1000000
    Used 9 threads, cnt is 1000000
    Used 10 threads, cnt is 1000000
    Used 11 threads, cnt is 1000000
    Used 12 threads, cnt is 1000000
    Used 13 threads, cnt is 1000000
    Used 14 threads, cnt is 1000000
    Used 15 threads, cnt is 1000000
    Used 16 threads, cnt is 1000000
    

    Note that sophisticated parallel processing frameworks don’t use such atomic index updates but rather split the range into equal sized subranges according to the intended target parallelism before starting the threads, so each thread can loop over its pre-assigned range using an ordinary local index variable.

    You get that for free when using Arrays.parallelSetAll(arr, i -> (long)i*i); or the Stream API.