javamultithreadingsynchronizationjava-threads

How to make threads execute in the order they are created in Java


I am using a niche database. It currently does not support the export of sql files in terms of data backup, only dump files, but the demand needs to export sql files, so I can only query them by myself and write them into the sql file.

Because the code is commercial, so I abstract the code, which causes inconvenience, please forgive me.

import lombok.AllArgsConstructor;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadQueryWrite {

    @AllArgsConstructor
    class MyRunnable implements Runnable{

        /**
         * database tableName
         */
        String tableName;
        /**
         * The starting primary key of the query range
         */
        long startId;
        /**
         * The ending primary key of the query range
         */
        long endId;

        /**
         * File writing character stream
         */
        Writer writer;



        @Override
        public void run() {
            doQuery();

        }

        private void doQuery() {
            // resultSet = select * from tableName where id between startId and endId;
            // doWriteFile(resultSet);
        }

        private void doWriteFile(ResultSet resultSet) {
            StringBuilder sb = new StringBuilder("(");
            try {
                while(resultSet.next()){
                    // sb.append()
                }
                synchronized(writer){
                    writer.write(sb.toString());
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        // Assuming that the number of rows of database data
        // has been obtained is 10000000 rows
        int count = 10000000;
        String tableName = "test_table";
        String filePath = "c:\\test\\test.sql";
        ThreadQueryWrite queryWrite = new ThreadQueryWrite();
        queryWrite.doExportSql(count, tableName, filePath);
    }

    private void doExportSql(int count, String tableName, String filePath) throws IOException {
        int maxQueryCount = 3000;

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors() * 2, 5, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>());

        Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
                StandardCharsets.UTF_8);
        for (int i = 1; i < count; i += maxQueryCount) {
            MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
            threadPoolExecutor.submit(myRunnable);
        }
        threadPoolExecutor.shutdown();
    }

}

Because some tables have a large amount of data, I want to save as much time as possible, so I use multi-threading. Each thread divides a range according to the primary key and performs asynchronous queries.

I want to ensure that the primary key id is continuous when writing to the file, from 1 to 10000000.

insert into table(id) values
    (1),
    ......,
    (3000);
insert into table(id) values
    (3001),
    ......,
    (6000);
......

However, I perform a segmented query based on the primary key id, and there is no guarantee which thread will execute the query first. This may cause the id range to be 3001~6000 at the beginning of the file, and the id to be 1~3000 at the end, as follows,

insert into table(id) values
    (3001),
    ......,
    (6000);
insert into table(id) values
    (1),
    ......,
    (3000);
......

I have tried this methodOrdering threads to run in the order they were created/started

But this may bring a lot of memory pressure, because I load all the query data into the memory, and then process the callback results of the threads one by one to write the SQL file.

I hope that the primary key id in the sql file is in order, according to the order of self-increment in the database.

There is currently no solution, any reply would be greatly appreciated.


Solution

  • We could enforce encountered order at terminal operation and do intermediate operations in parallel using streams parallel and foreachOrdered as mentioned here and here Did not find anything which could change the forkjoin pool queue to priority queue to use startId to decide priority.

    import java.io.*;
    import java.nio.charset.StandardCharsets;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    public class ThreadQueryWrite {
    
        @AllArgsConstructor
        class MyRunnable implements Runnable{
    
            /**
             * database tableName
             */
            String tableName;
            /**
             * The starting primary key of the query range
             */
            long startId;
    
            public long getStartId() {
                return startId;
            }
    
            /**
             * The ending primary key of the query range
             */
            long endId;
    
            /**
             * File writing character stream
             */
            Writer writer;
    
    
    
            @Override
            public void run() {
                doQuery();
    
            }
    
            private void doQuery() {
                // resultSet = select * from tableName where id between startId and endId;
                // doWriteFile(resultSet);
            }
    
            public StringBuilder queryAndGetResult() {
                System.out.println(startId+" Querying for startId ");
                // resultSet = select * from tableName where id between startId and endId;
                ResultSet resultSet = null;
                StringBuilder sb = new StringBuilder(""+startId);
                try {
                   // while(resultSet.next()){
                        // sb.append()
                    //}
    
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                System.out.println(startId+" Query Result ");
    
                return sb;
            }
    
    
    
            private void doWriteFile(ResultSet resultSet) {
                StringBuilder sb = new StringBuilder("(");
                try {
                    while(resultSet.next()){
                        // sb.append()
                    }
                    synchronized(writer){
                        writer.write(sb.toString());
                    }
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    
    
        public static void main(String[] args) throws IOException {
            // Assuming that the number of rows of database data
            // has been obtained is 10000000 rows
            int count = 100;
            String tableName = "test_table";
            String filePath = "c:\\test\\test.sql";
            ThreadQueryWrite queryWrite = new ThreadQueryWrite();
            queryWrite.doExportSql(count, tableName, filePath);
        }
    
        private void doExportSql(int count, String tableName, String filePath) throws IOException {
            int maxQueryCount = 3;//for increasing readibility of output
    
            Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
                    StandardCharsets.UTF_8);
            ArrayList<MyRunnable> myRunnables = new ArrayList<>();
            for (int i = 1; i < count; i += maxQueryCount) {
                MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
                myRunnables.add(myRunnable);
            }
    
            myRunnables.stream()
                    .parallel()
                    .map(myRunnable -> myRunnable.queryAndGetResult())
                    .forEachOrdered(resultSetString-> writeToFile(writer, resultSetString));
    
    
        }
    
        private static void writeToFile(Writer writer, StringBuilder resultSetString) {
            try {
                System.out.println(resultSetString+" File out");
                writer.write(resultSetString.toString());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
    }