I want to serialize 'Message' object, I can successfully transfer it as bytes array through socketChannel. After that, I change the object's properties (so that it may have larger size), and then there's a problem in sending object back to the client. Once I try to obtain the object on the client side, I get an exception, it occurs when I deserealize Message obj in getResponse() method:
org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000
But, somehow, this applies only for the first client (After the exception is thrown, connection with the first client is over) and when I start a new client (not closing server) I can successfully transfer the object back and forth, furthermore, it works for any new clients.
This is my minimal debuggable version:
import org.apache.commons.lang3.SerializationUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Client {
private SocketChannel server;
public void start() throws IOException {
try {
server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
server.configureBlocking(false);
} catch (IOException e) {
System.err.println("Server isn't responding");
System.exit(0);
}
Scanner scRequest = new Scanner(System.in);
Scanner scState = new Scanner(System.in);
System.out.println("Enter request:");
String request = scRequest.nextLine();
while (!request.equals("exit")) {
try {
// In my actual project class Person is a way different (But it's still a POJO)
// I included it here to make sure I can get it back after sending to the server
System.out.println("Enter a number:");
Person person = new Person(scState.nextInt());
sendRequest(request, person);
System.out.println("\nEnter request:");
request = scRequest.nextLine();
} catch (Exception e) {
e.printStackTrace();
}
}
stop();
}
public void sendRequest(String sMessage, Person person) {
Message message = new Message(sMessage, person);
ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
try {
server.write(requestBuffer);
requestBuffer.clear();
getResponse();
} catch (Exception e) {
System.out.println(e.getMessage());
System.err.println("Connection lost");
System.exit(0);
}
}
public void getResponse() throws Exception {
ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
int read = server.read(responseBuffer);
responseBuffer.clear();
if(read == -1) {
throw new Exception();
}
byte[] bytes = new byte[responseBuffer.limit()];
responseBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
System.out.println(message);
}
public void stop() throws IOException {
server.close();
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.start();
}
}
import org.apache.commons.lang3.SerializationUtils;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server started");
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
try {
getRequest(key);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
iter.remove();
}
}
}
private void getRequest(SelectionKey key) throws Exception {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
int read = client.read(requestBuffer);
requestBuffer.clear();
if(read == -1) {
key.cancel();
throw new Exception("Client disconnected at: " +
((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
}
byte[] bytes = new byte[requestBuffer.limit()];
requestBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
sendResponse(client, message);
}
private void sendResponse(SocketChannel client, Message message) throws IOException {
message.setResult("Some result");
ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
while (responseBuffer.hasRemaining()) {
client.write(responseBuffer);
}
responseBuffer.clear();
}
private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
}
public static void main(String[] args) throws Exception {
new Server().start();
}
}
I try to send this object as a bytes array:
import java.io.Serializable;
import java.util.Formatter;
public class Message implements Serializable {
private String command;
private Person person;
private String result;
public Message(String command, Person person) {
this.command = command;
this.person = person;
}
public String getCommand() {
return command;
}
public void setCommand(String executedCommand) {
this.command = executedCommand;
}
public Person getPerson() {
return person;
}
public void setPerson(Person person) {
this.person = person;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public String toString() {
return new Formatter()
.format("Command: %s\nAttached object: %s\nResult: %s",
command, person, result)
.toString();
}
}
I include instance of this class inside Message obj:
public class Person implements Serializable {
private final int state;
public Person(int state) {
this.state = state;
}
@Override
public String toString() {
return "Person state: " + state;
}
}
I have no idea what is going wrong, hope for your help.
UPD: I used 'org.apache.commons:commons-lang3:3.5' dependency to serialize an object into bytes array
I have never used Java NIO channels before, so I am not an expert. But I found out several things:
General:
e.printStackTrace()
instead of just System.out.println(e.getMessage())
.Client:
SocketChannel server
in the client should be configured as blocking, otherwise it might read 0 bytes because there is no server response yet, which causes your problem.ByteBuffer.clear()
before reading something, not afterwards.responseBuffer.position(0)
before calling get(byte[])
, otherwise it will read undefined bytes after the ones just read.Server:
ByteBuffer.clear()
before reading something, not afterwards.responseBuffer.position(0)
before calling get(byte[])
, otherwise it will read undefined bytes after the ones just read.getRequest(key)
calls, you should close the corresponding channel, otherwise after a client disconnects the server will indefinitely try to read from it, spamming your console log with error messages. My modification handles that case and also prints a nice log message telling which client (remote socket address) was closed.Caveat: There is nothing in your code dealing with the situation that a request or response written into the channel on the one side is bigger than the maximum ByteBuffer
size on the other side. Similarly, in theory a (de)serialised byte[]
could also end up being bigger than the byte buffer.
Here are my diffs:
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
public void start() throws IOException {
try {
server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
- server.configureBlocking(false);
+ server.configureBlocking(true);
}
catch (IOException e) {
System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
getResponse();
}
catch (Exception e) {
- System.out.println(e.getMessage());
+ e.printStackTrace();
+// System.out.println(e.getMessage());
System.err.println("Connection lost");
System.exit(0);
}
}
public void getResponse() throws Exception {
- ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+ ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+ responseBuffer.clear();
int read = server.read(responseBuffer);
- responseBuffer.clear();
if (read == -1) {
- throw new Exception();
+ throw new Exception("EOF, cannot read server response");
}
- byte[] bytes = new byte[responseBuffer.limit()];
+ byte[] bytes = new byte[read];
+ responseBuffer.position(0);
responseBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
getRequest(key);
}
catch (Exception e) {
- System.err.println(e.getMessage());
+ e.printStackTrace();
+// System.err.println(e.getMessage());
+ SocketChannel client = (SocketChannel) key.channel();
+ System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+ client.close();
}
}
iter.remove();
@@ -45,15 +49,16 @@
private void getRequest(SelectionKey key) throws Exception {
SocketChannel client = (SocketChannel) key.channel();
- ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+ ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+ requestBuffer.clear();
int read = client.read(requestBuffer);
- requestBuffer.clear();
if (read == -1) {
key.cancel();
throw new Exception("Client disconnected at: " +
((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
}
- byte[] bytes = new byte[requestBuffer.limit()];
+ byte[] bytes = new byte[read];
+ requestBuffer.position(0);
requestBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
sendResponse(client, message);
Just for completeness' sake, here are the full classes after I changed them:
import org.apache.commons.lang3.SerializationUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Client {
private SocketChannel server;
public void start() throws IOException {
try {
server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
server.configureBlocking(true);
}
catch (IOException e) {
System.err.println("Server isn't responding");
System.exit(0);
}
Scanner scRequest = new Scanner(System.in);
Scanner scState = new Scanner(System.in);
System.out.println("Enter request:");
String request = scRequest.nextLine();
while (!request.equals("exit")) {
try {
// In my actual project class Person is a way different (But it's still a POJO)
// I included it here to make sure I can get it back after sending to the server
System.out.println("Enter a number:");
Person person = new Person(scState.nextInt());
sendRequest(request, person);
System.out.println("\nEnter request:");
request = scRequest.nextLine();
}
catch (Exception e) {
e.printStackTrace();
}
}
stop();
}
public void sendRequest(String sMessage, Person person) {
Message message = new Message(sMessage, person);
ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
try {
server.write(requestBuffer);
requestBuffer.clear();
getResponse();
}
catch (Exception e) {
e.printStackTrace();
// System.out.println(e.getMessage());
System.err.println("Connection lost");
System.exit(0);
}
}
public void getResponse() throws Exception {
ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
responseBuffer.clear();
int read = server.read(responseBuffer);
if (read == -1) {
throw new Exception("EOF, cannot read server response");
}
byte[] bytes = new byte[read];
responseBuffer.position(0);
responseBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
System.out.println(message);
}
public void stop() throws IOException {
server.close();
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.start();
}
}
import org.apache.commons.lang3.SerializationUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server started");
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
try {
getRequest(key);
}
catch (Exception e) {
e.printStackTrace();
// System.err.println(e.getMessage());
SocketChannel client = (SocketChannel) key.channel();
System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
client.close();
}
}
iter.remove();
}
}
}
private void getRequest(SelectionKey key) throws Exception {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
requestBuffer.clear();
int read = client.read(requestBuffer);
if (read == -1) {
key.cancel();
throw new Exception("Client disconnected at: " +
((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
}
byte[] bytes = new byte[read];
requestBuffer.position(0);
requestBuffer.get(bytes);
Message message = SerializationUtils.deserialize(bytes);
sendResponse(client, message);
}
private void sendResponse(SocketChannel client, Message message) throws IOException {
message.setResult("Some result");
ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
while (responseBuffer.hasRemaining()) {
client.write(responseBuffer);
}
responseBuffer.clear();
}
private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
}
public static void main(String[] args) throws Exception {
new Server().start();
}
}