c++socketsepoll

TCP Server: recv() with dynamically increasing buffer size


I am trying to read all the data sent by a client to a TCP server without knowing in advance how large the data might be and without limiting the size of the received buffer size. The TCP server is based on epoll() and the sockets are non-blocking. The server is multithreaded: The main event loop checks if the epoll event is a new connection, i.e., the socket file descriptor, and accepts it otherwise it adds a task to a queue which will be handled by a thread pool once a thread is available to handle the i/o with the clients. This is all working, however, what I am not able to achieve is to dynamically increase the received buffer size.

The issue with this implementation is that whenever the received input is of length equal to a multiple of maximum buffer size given to recv() the loop always wait for the next sent message.

The receiver function is as follows:

static void do_recv(int fd) {
  char rbuf[MAX_RECV_BUFFER_SIZE] = {0};
  int n = -1;
  while (1) {
    n = recv(fd, rbuf, sizeof(rbuf), MSG_DONTWAIT);
    if (n > 0) {
      // each client when the connection is accepted gets a NULL buffer to recieve their data
      extend_rbuf(fd, rbuf, n, sizeof(rbuf)); 
      std::cout << "bytes read: " << n << " current read: " << rbuf << "\n";
      std::cout << "size: " << clients.get_client(fd)->rbuf.size
                << " len: " << clients.get_client(fd)->rbuf.len << "\n";
      if (n == sizeof(rbuf)) {
        continue;
      }
      std::cout << "read ended here, read from client\n"
                << clients.get_client(fd)->rbuf.buf << '\n';
      break;
    }
    if (n == 0) {
      return close_fd(fd);
    }
    if (errno == EINTR) {
      continue;
    } else if (errno == EAGAIN) {
      modify_fd_event(fd, EPOLLIN);
      break;
    } else {
      return close_fd(fd);
    }
  }

  if (n <= 0) {
    return;
  }
}

The function extend_rbuf() is defined as such

int extend_rbuf(int fd, const char *data, int rlen, int rsize) {
  Clients::client_buffer *client_rbuf = &(clients.get_client(fd)->rbuf);
  if (client_rbuf->buf == NULL) {
    client_rbuf->buf = (char *)malloc(sizeof(char) * rlen);
    client_rbuf->size = rsize;
  }
  if (client_rbuf->size - client_rbuf->len > rlen) {
    client_rbuf->len = rlen;
    memcpy(client_rbuf->buf + client_rbuf->len, data, rlen);
  } else {
    client_rbuf->size = client_rbuf->len + rlen + 1;
    char *new_rbuf = (char *)realloc(client_rbuf->buf, client_rbuf->size * sizeof(char));
    if (!new_rbuf) {
      perror("realloc new_rbuf: out of memory");
    }
    client_rbuf->buf = new_rbuf;
    memcpy(client_rbuf->buf + client_rbuf->len, data, rlen);
    client_rbuf->len += rlen;
  }
  return 0;
}

The condition causing the issue is:

if (n == sizeof(rbuf)) {
  continue;
}

To test this approach I set MAX_RECV_BUFFER_SIZE to 2. The problem is when I send a message of length l such that l mod 2 = 1 then "read ended here is reached", for example if I send the input 123 with telnet (I think telnet adds \r\n) the program prints this:

bytes read: 2 current read: 12
size: 3 len: 2
bytes read: 2 current read: 3
size: 5 len: 4
bytes read: 1 current read: 

size: 6 len: 5
read ended here, read from client
123

However when the input is of length 4, for example 1234 then this is the output:

bytes read: 2 current read: 12
size: 3 len: 2
bytes read: 2 current read: 34
size: 5 len: 4
bytes read: 2 current read: 

size: 7 len: 6

and I only get the message 1234 if I send another message such that the length is odd, for example sending o then I get now

bytes read: 2 current read: o
size: 9 len: 8
bytes read: 1 current read: 

size: 10 len: 9
read ended here, read from client
1234 # 1234 shows up here now
o

Is there a way to do this differently?


Solution

  • I think I found a solution, and it is working. Many thanks to the comments and the hints. The idea is to expect a delimiter, in this case \r\n and modify the extend_rbuf() function:

    static void do_recv(int fd) {
      char rbuf[MAX_RECV_BUFFER_SIZE] = {0};
      int n = -1;
      while (1) {
        memset(rbuf, 0, sizeof(rbuf)); // reset the receiver char
        n = recv(fd, rbuf, sizeof(rbuf), MSG_DONTWAIT);
        if (n > 0) {
          extend_rbuf(fd, rbuf);
          if (n == sizeof(rbuf) && rbuf[sizeof(rbuf) - 1] != '\n' &&
              rbuf[sizeof(rbuf) - 2] != '\r') {
            continue;
          }
          std::cout << "read ended here, read from client: "
                    << clients.get_client(fd)->rbuf << '\n';
          // ... some send logic ...
          clients.get_client(fd)->rbuf.clear();
          if (errno == EINTR) {
            continue;
          }
          break;
        }
        if (n == 0) {
          return close_fd(fd);
        }
        if (errno == EINTR) {
          continue;
        } else if (errno == EAGAIN) {
          modify_fd_event(fd, EPOLLIN);
          break;
        } else {
          return close_fd(fd);
        }
      }
    
      if (n <= 0) {
        return;
      }
    }
    

    and the extend_rbuf() is

    void extend_rbuf(int fd, const char *data) {
      std::string *client_rbuf = &clients.get_client(fd)->rbuf;
      (*client_rbuf).append(data);
    }
    

    This is working with any input length, even if the initial buffer is of size 2. If I run into any issues or find a better solution, I will update this answer.