javaserializationniobytebuffersocketchannel

SocketChannel. invalid stream header: 00000000


I want to serialize 'Message' object, I can successfully transfer it as bytes array through socketChannel. After that, I change the object's properties (so that it may have larger size), and then there's a problem in sending object back to the client. Once I try to obtain the object on the client side, I get an exception, it occurs when I deserealize Message obj in getResponse() method:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

But, somehow, this applies only for the first client (After the exception is thrown, connection with the first client is over) and when I start a new client (not closing server) I can successfully transfer the object back and forth, furthermore, it works for any new clients.

This is my minimal debuggable version:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request, person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
    }

    private void sendResponse(SocketChannel client, Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

I try to send this object as a bytes array:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command, Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",
                        command, person, result)
                .toString();
    }
}

I include instance of this class inside Message obj:

public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

I have no idea what is going wrong, hope for your help.

UPD: I used 'org.apache.commons:commons-lang3:3.5' dependency to serialize an object into bytes array


Solution

  • I have never used Java NIO channels before, so I am not an expert. But I found out several things:

    General:

    Client:

    Server:

    Caveat: There is nothing in your code dealing with the situation that a request or response written into the channel on the one side is bigger than the maximum ByteBuffer size on the other side. Similarly, in theory a (de)serialised byte[] could also end up being bigger than the byte buffer.

    Here are my diffs:

    Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
    ===================================================================
    --- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
    +++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
    @@ -15,7 +15,7 @@
       public void start() throws IOException {
         try {
           server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
    -      server.configureBlocking(false);
    +      server.configureBlocking(true);
         }
         catch (IOException e) {
           System.err.println("Server isn't responding");
    @@ -56,22 +56,24 @@
           getResponse();
         }
         catch (Exception e) {
    -      System.out.println(e.getMessage());
    +      e.printStackTrace();
    +//      System.out.println(e.getMessage());
           System.err.println("Connection lost");
           System.exit(0);
         }
       }
     
       public void getResponse() throws Exception {
    -    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
    +    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    +    responseBuffer.clear();
     
         int read = server.read(responseBuffer);
    -    responseBuffer.clear();
         if (read == -1) {
    -      throw new Exception();
    +      throw new Exception("EOF, cannot read server response");
         }
     
    -    byte[] bytes = new byte[responseBuffer.limit()];
    +    byte[] bytes = new byte[read];
    +    responseBuffer.position(0);
         responseBuffer.get(bytes);
     
         Message message = SerializationUtils.deserialize(bytes);
    
    Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
    ===================================================================
    --- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
    +++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
    @@ -35,7 +35,11 @@
                 getRequest(key);
               }
               catch (Exception e) {
    -            System.err.println(e.getMessage());
    +            e.printStackTrace();
    +//            System.err.println(e.getMessage());
    +            SocketChannel client = (SocketChannel) key.channel();
    +            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
    +            client.close();
               }
             }
             iter.remove();
    @@ -45,15 +49,16 @@
     
       private void getRequest(SelectionKey key) throws Exception {
         SocketChannel client = (SocketChannel) key.channel();
    -    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
    +    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    +    requestBuffer.clear();
         int read = client.read(requestBuffer);
    -    requestBuffer.clear();
         if (read == -1) {
           key.cancel();
           throw new Exception("Client disconnected at: " +
             ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
         }
    -    byte[] bytes = new byte[requestBuffer.limit()];
    +    byte[] bytes = new byte[read];
    +    requestBuffer.position(0);
         requestBuffer.get(bytes);
         Message message = SerializationUtils.deserialize(bytes);
         sendResponse(client, message);
    

    Just for completeness' sake, here are the full classes after I changed them:

    import org.apache.commons.lang3.SerializationUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class Client {
    
      private SocketChannel server;
    
      public void start() throws IOException {
        try {
          server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
          server.configureBlocking(true);
        }
        catch (IOException e) {
          System.err.println("Server isn't responding");
          System.exit(0);
        }
    
        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);
    
        System.out.println("Enter request:");
        String request = scRequest.nextLine();
    
        while (!request.equals("exit")) {
          try {
            // In my actual project class Person is a way different (But it's still a POJO)
            // I included it here to make sure I can get it back after sending to the server
            System.out.println("Enter a number:");
            Person person = new Person(scState.nextInt());
            sendRequest(request, person);
    
            System.out.println("\nEnter request:");
            request = scRequest.nextLine();
          }
          catch (Exception e) {
            e.printStackTrace();
          }
        }
    
        stop();
      }
    
      public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
          server.write(requestBuffer);
          requestBuffer.clear();
          getResponse();
        }
        catch (Exception e) {
          e.printStackTrace();
    //      System.out.println(e.getMessage());
          System.err.println("Connection lost");
          System.exit(0);
        }
      }
    
      public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
        responseBuffer.clear();
    
        int read = server.read(responseBuffer);
        if (read == -1) {
          throw new Exception("EOF, cannot read server response");
        }
    
        byte[] bytes = new byte[read];
        responseBuffer.position(0);
        responseBuffer.get(bytes);
    
        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
      }
    
      public void stop() throws IOException {
        server.close();
      }
    
      public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
      }
    }
    
    import org.apache.commons.lang3.SerializationUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Server {
      public void start() throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Server started");
    
        while (true) {
          selector.select();
          Set<SelectionKey> selectedKeys = selector.selectedKeys();
          Iterator<SelectionKey> iter = selectedKeys.iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            if (key.isAcceptable()) {
              register(selector, serverSocket);
            }
            if (key.isReadable()) {
              try {
                getRequest(key);
              }
              catch (Exception e) {
                e.printStackTrace();
    //            System.err.println(e.getMessage());
                SocketChannel client = (SocketChannel) key.channel();
                System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
                client.close();
              }
            }
            iter.remove();
          }
        }
      }
    
      private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        requestBuffer.clear();
        int read = client.read(requestBuffer);
        if (read == -1) {
          key.cancel();
          throw new Exception("Client disconnected at: " +
            ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }
        byte[] bytes = new byte[read];
        requestBuffer.position(0);
        requestBuffer.get(bytes);
        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
      }
    
      private void sendResponse(SocketChannel client, Message message) throws IOException {
        message.setResult("Some result");
        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
          client.write(responseBuffer);
        }
        responseBuffer.clear();
      }
    
      private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
      }
    
      public static void main(String[] args) throws Exception {
        new Server().start();
      }
    }