dartdart-iodart-isolates

Dart: handle incoming HTTP requests in parallel


I am trying to write an HTTP server in Dart that can handle multiple requests in parallel. I have been unsuccessful at achieving the "parallel" part thus far.

Here is what I tried at first:

import 'dart:io';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
      request.response.statusCode = HttpStatus.OK;
      request.response.write(stopwatch.elapsedMilliseconds.toString());
      request.response.close().catchError(print);
    });
  });
}

On each request it does busy work for one second, then completes. I made it handle requests this way so that its timing would be predictable, and so I could easily see the effect of a request in Windows task manager (a CPU core jumping to 100% usage).

I can tell this is not handling requests in parallel because:

  1. If I load up several browser tabs to http://example:8080/ and then refresh them all, the tabs load one after another in sequence, approximately 1 second between each.

  2. If I use the load-testing tool wrk with these settings... wrk -d 10 -c 8 -t 8 http://example:8080/ ...it completes 5 to 8 requests in the 10 seconds I gave it. If the server was using all my 8 cores, I'd expect a number closer to 80 requests.

  3. When I open the Windows task manager during the wrk test, I observe that only one of my cores is near 100% usage, and the rest are pretty much idle.

So, then I tried using isolates, hoping to manually spawn a new isolate/thread for each request:

import 'dart:io';
import 'dart:isolate';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      spawnFunction(handleRequest).send(request);
    });
  });
}

handleRequest() {
  port.receive((HttpRequest request, SendPort sender) {
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.start();
    while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
    request.response.statusCode = HttpStatus.OK;
    request.response.write(stopwatch.elapsedMilliseconds.toString());
    request.response.close().catchError(print);
  });
}

This does not work at all. It doesn't like that I'm trying to send an HttpRequest as the message to the isolate. Here is the error:

#0      _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1      _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2      _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3      main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4      _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5      _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6      _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7      _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8      _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9      StreamController.add (dart:async/stream_controller.dart:10:35)
#10     _HttpServer._handleRequest (http_impl.dart:1261:20)
#11     _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17     StreamController.add (dart:async/stream_controller.dart:10:35)
#18     _HttpParser._doParse (http_parser.dart:415:26)
#19     _HttpParser._parse (http_parser.dart:161:15)
#20     _HttpParser._onData._onData (http_parser.dart:509:11)
#21     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26     StreamController.add (dart:async/stream_controller.dart:10:35)
#27     _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33     StreamController.add (dart:async/stream_controller.dart:10:35)
#34     _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35     _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36     _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0      _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3      Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4      Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5      Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Versions used:

Is it possible to handle these requests in parallel, with all my machine's available cores, using Dart?


Solution

  • I wrote a library called dart-isoserver to do this a while back. It's severely bit rotted now, but you can see the approach.

    https://code.google.com/p/dart-isoserver/

    What I did was proxy HttpRequest and HttpResponse via isolate ports, since you cannot send them directly. It worked, though there were a few caveats:

    1. I/O on the request and response went though the main isolate, so that part was not parallel. Other work done in the worker isolate didn't block the main isolate though. What really should happen is that a socket connection should be transferrable between isolates.
    2. Exceptions in the isolate would bring down the whole server. spawnFunction() now has an uncaught exception handler parameter, so this is somewhat fixable, but spawnUri() doesn't. dart-isoserver used spawnUri() to implement hot-loading, so that would have to be removed.
    3. Isolates are a little slow to start up, and you probably don't want one per connection for the thousands of concurrent connection use cases that nginx and node.js target. An isolate pool with work queues would probably perform better, though that would eliminate the nice feature you could use blocking I/O in a worker.

    A note about your first code example. That definitely won't run in parallel, as you noticed, because Dart is single-threaded. No Dart code in the same isolate ever runs concurrently.