I am trying to use PostgreSQL CopyManager copyIn functionality with COPY FROM STDIN
as suggested in the docs for very fast copying from an InputStream into a database table. I am thinking of using this to continuously stream rows that are to be written to a table as and when i receive/process one. However the below quick and dirty sample code seems to be stuck on copyIn
and does not write to the table.
Anyone knows what i am missing here or if my understanding is wrong?
import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;
public class PGConnectTest {
public static void main(String[] args) {
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection)connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable callable = () -> {
Thread.sleep(3000);
String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
String row = null;
for(int i=1; i<10; i++) {
row = String.format(frmtStr, i, i, ("row"+i));
System.out.print(row);
bw.write(row);
}
bw.write("\n");
bw.flush();
System.out.println("WRITTEN!");
return true;
};
executorService.submit(callable);
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN";
executorService.submit(() -> copyManager.copyIn(copySql, br));
Thread.sleep(10000);
System.out.println("QUITTING");
} catch (Exception e) {
throw e;
}
} catch(Exception ex) {
System.out.println(ex);
}
}
}
The schema of the table testtbl
is below,
create table testtbl (
id integer primary key,
jsnclm jsonb
)
The console output is (it does NOT return and requires using CTRL+C to kill it),
C:\Users\ml410408\Documents\Useful Lookups\POSTGRESQL>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1 {"id":1, "somefield":"row1"}
2 {"id":2, "somefield":"row2"}
3 {"id":3, "somefield":"row3"}
4 {"id":4, "somefield":"row4"}
5 {"id":5, "somefield":"row5"}
6 {"id":6, "somefield":"row6"}
7 {"id":7, "somefield":"row7"}
8 {"id":8, "somefield":"row8"}
9 {"id":9, "somefield":"row9"}
WRITTEN!
QUITTING
UPDATE:
Once i changed the format of the COPY
sql command from the default TEXT to CSV and pass in csv records its no longer stuck but does nothing (meaning no records in the table) even though it returns unlike before.
import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;
public class PGConnectTest {
public static void main(String[] args) {
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection)connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable callable = () -> {
Thread.sleep(3000);
String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
String row = null;
for(int i=1; i<10; i++) {
row = String.format(frmtStr, i, i, ("row"+i));
System.out.print(row);
bw.write(row);
}
bw.write("\n");
bw.write("'\\.'\n");
System.out.println("'\\.'\n");
bw.flush();
os.flush();
System.out.println("WRITTEN!");
return true;
};
executorService.submit(callable);
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
executorService.submit(() -> copyManager.copyIn(copySql, br));
Thread.sleep(5000);
System.out.println(br.ready());
while (br.ready()) {
System.out.println("LINE : " + br.readLine());
}
executorService.shutdown();
System.out.println("QUITTING");
} catch (Exception e) {
throw e;
}
System.out.println("QUITTING FINALLY");
} catch(Exception ex) {
System.out.println(ex);
}
}
}
Thanks
There seem to be a couple of different issues in there.
ExecutorService
is keeping it alive; calling shutdown()
after submitting the tasks causes it to terminate as expected.copyIn()
is throwing an exception: the trailing newline in the stream (bw.write("\n")
) triggers an ERROR: invalid input syntax for integer: ""
as it fails to find the id
column.Even then, it looks like this is still subject to some race conditions due to the timing of the resource cleanup. The copyIn()
call will block until it reaches the end of its InputStream
, and in the case of a PipedInputStream
, the "end" is the point where the PipedOutputStream
is closed. But after the stream is closed and the copyIn()
call is unblocked, the input stream and the database connection are closed in quick succession, potentially before the copy has a chance to finalise. At best, it seems to successfully commit to the table, but then error out with a "Database connection failed when canceling copy operation".
To make sure that these resources aren't released while they're still in use:
OutputStream
InputStream
/ Connection
Waiting for the tasks to complete has the added benefit of propagating any exceptions to the main thread.
There's also a potential deadlock due to the newSingleThreadExecutor()
: if the writer thread fills the pipe's buffer, it will block until the reader starts consuming the data, which will never happen if they're being executed sequentially. Using a newFixedThreadPool(2)
should fix this.
With all that in mind:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection) connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
) {
Future write;
Future copy;
try (
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os))) {
write = executorService.submit(() -> {
String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
String row = null;
for (int i = 1; i < 1000; i++) {
row = String.format(frmtStr, i, i, ("row" + i));
System.out.print(row);
bw.write(row);
}
bw.flush();
System.out.println("WRITTEN!");
return true;
});
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN";
copy = executorService.submit(() -> copyManager.copyIn(copySql, br));
System.out.println("QUITTING");
write.get();
}
copy.get();
}
} catch (Exception ex) {
System.out.println(ex);
} finally {
executorService.shutdown();
}
}