I am implementing a Transfer Server program which takes messages from clients (via console input) and then forwards it to some sort of mailbox.
To allow concurrent reception of several messages by different clients, I first created a class that implements the Runnable
interface. Each of this class instances will handle the communication with exactly one client:
public class ClientConnection implements Runnable {
//...
//...
@Override
public void run() {
try {
// prepare the input reader and output writer
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
Message message = new Message();
String request = "";
// read client requests
while ((request = reader.readLine()) != null) {
System.out.println("Client sent the following request: " + request);
String response;
if (request.trim().equals("quit")) {
writer.println("ok bye");
return;
}
response = message.parseRequest(request);
if (message.isCompleted()) {
messagesQueue.put(message);
message = new Message();
}
writer.println(response);
}
} catch (SocketException e) {
System.out.println("ClientConnection: SocketException while handling socket: " + e.getMessage());
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
System.out.println("Client Connection was interrupted!");
e.printStackTrace();
} finally {
if (clientSocket != null && !clientSocket.isClosed()) {
try {
clientSocket.close();
} catch (IOException ignored) {}
}
}
}
}
I do have a parent thread which is responsible for starting and managing all the ClientConnection runnables:
@Override
public void run() {
clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
while (true) {
Socket clientSocket;
try {
// wait for a Client to connect
clientSocket = serverSocket.accept();
ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
clientConnectionExecutor.execute(clientConnection);
} catch (IOException e) {
// when this exception occurs, it means that we want to shut down everything
clientConnectionExecutor.shutdownNow(); // force terminate all ClientConnections
return;
}
}
}
Now according to this Stackoverflow Question, I would have expected that as soon as shutdownNow();
is being called, an InterruptedException
would be thrown within my ClientConnection.run()
method, and there, it should print Client Connection was interrupted!
. But this does not happen, so the catch clause seems never to be reached, the input reading loop just goes on.
I read in another Stackoverflow question that this might be related to some other codeline within the block seems to be consuming the InterruptedException
, but there wasn't any particular information on what codeline could do that. So I am thankful for any hints.
Edit: It turns out that as soon as I manually exit the loop by typing "quit" on the client, the loop will quit and then, Client Connection was interrupted!
will be printed. So somehow the exception seems to be ignored as long as the loop is running, and only handled afterwards.
From Oracle docs for shutdownNow
:
There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.
If you take a look into ThreadPoolExecutor
sources, you will find out that shutdownNow
interrupts threads with this code:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
Your ClientConnection
doesn't check the flag Thread.interrupted
. Due to information in the post, I can't figure out which method throws InterruptedException
. Probably, some other method, for example, readLine
of reader or writer, blocks the thread, because they use socket's InputStream
and OutputStream
and because it's obvious that socket's streams block the thread if data is not immediatly available.
For example, I wrote this code to test it:
class Example {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try(ServerSocket serverSocket = new ServerSocket()) {
serverSocket.bind(new InetSocketAddress(8080));
Socket socket = serverSocket.accept();
int dataByte = socket.getInputStream().read();
System.out.println(dataByte);
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.interrupt();
}
}
On OpenJdk-16.0.2 there is no actual interruption.
I see two possible solutions for your problem:
Check Thread.interrupted
inside the while
loop if you are sure that Socket
doesn't block your thread.
If your are not sure, use SocketChannel
in non-blocking mode instead of Socket
for checking Thread.interrupted
manually.
For the second way I tranformed my example into this:
class Example {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(8080));
SocketChannel socket = null;
while (socket == null) {
socket = serverSocket.accept();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socket.read(byteBuffer);
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.flip();
byteBuffer.get(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
System.out.println("Interrupted successfully");
}
});
thread.start();
thread.interrupt();
}
}
It works.
Good luck with Java :)