javapostgresqljdbcstreamingjdbc-postgres

PostgreSQL CopyManager copyIn appears stuck doing nothing when using it with COPY FROM STDIN


I am trying to use PostgreSQL CopyManager copyIn functionality with COPY FROM STDIN as suggested in the docs for very fast copying from an InputStream into a database table. I am thinking of using this to continuously stream rows that are to be written to a table as and when i receive/process one. However the below quick and dirty sample code seems to be stuck on copyIn and does not write to the table.

Anyone knows what i am missing here or if my understanding is wrong?

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(10000);
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

The schema of the table testtbl is below,

create table testtbl (
id  integer primary key,
jsnclm  jsonb
)

The console output is (it does NOT return and requires using CTRL+C to kill it),

C:\Users\ml410408\Documents\Useful Lookups\POSTGRESQL>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1       {"id":1, "somefield":"row1"}
2       {"id":2, "somefield":"row2"}
3       {"id":3, "somefield":"row3"}
4       {"id":4, "somefield":"row4"}
5       {"id":5, "somefield":"row5"}
6       {"id":6, "somefield":"row6"}
7       {"id":7, "somefield":"row7"}
8       {"id":8, "somefield":"row8"}
9       {"id":9, "somefield":"row9"}
WRITTEN!
QUITTING

UPDATE:

Once i changed the format of the COPY sql command from the default TEXT to CSV and pass in csv records its no longer stuck but does nothing (meaning no records in the table) even though it returns unlike before.

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.write("'\\.'\n");
                            System.out.println("'\\.'\n");
                            bw.flush();
                            os.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(5000);
                        System.out.println(br.ready());
                        while (br.ready()) {
                            System.out.println("LINE : " + br.readLine());
                        }
                        executorService.shutdown();
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
                System.out.println("QUITTING FINALLY");
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

Thanks


Solution

  • There seem to be a couple of different issues in there.

    Even then, it looks like this is still subject to some race conditions due to the timing of the resource cleanup. The copyIn() call will block until it reaches the end of its InputStream, and in the case of a PipedInputStream, the "end" is the point where the PipedOutputStream is closed. But after the stream is closed and the copyIn() call is unblocked, the input stream and the database connection are closed in quick succession, potentially before the copy has a chance to finalise. At best, it seems to successfully commit to the table, but then error out with a "Database connection failed when canceling copy operation".

    To make sure that these resources aren't released while they're still in use:

    Waiting for the tasks to complete has the added benefit of propagating any exceptions to the main thread.

    There's also a potential deadlock due to the newSingleThreadExecutor(): if the writer thread fills the pipe's buffer, it will block until the reader starts consuming the data, which will never happen if they're being executed sequentially. Using a newFixedThreadPool(2) should fix this.

    With all that in mind:

      public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        try {
          try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
              BaseConnection pgcon = (BaseConnection) connection;
              PipedInputStream is = new PipedInputStream();
              BufferedReader br = new BufferedReader(new InputStreamReader(is));
          ) {
            Future write;
            Future copy;
            try (
                PipedOutputStream os = new PipedOutputStream(is);
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os))) {
              write = executorService.submit(() -> {
                String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
                String row = null;
                for (int i = 1; i < 1000; i++) {
                  row = String.format(frmtStr, i, i, ("row" + i));
                  System.out.print(row);
                  bw.write(row);
                }
                bw.flush();
                System.out.println("WRITTEN!");
                return true;
              });
              System.out.println(connection);
              CopyManager copyManager = new CopyManager(pgcon);
              String copySql = "COPY dcm.testtbl FROM STDIN";
              copy = executorService.submit(() -> copyManager.copyIn(copySql, br));
              System.out.println("QUITTING");
              write.get();
            }
            copy.get();
          }
        } catch (Exception ex) {
          System.out.println(ex);
        } finally {
          executorService.shutdown();
        }
      }