node.jsocamljs-of-ocamlocaml-lwt

What are the differences between Lwt.async and Lwt_main.run on OCaml/Node.JS?


I am experimenting with js_of_ocaml and node.js. As you know, node.js makes extensive use of callbacks to implement asynchronous requests without introducing explicit threads.

In OCaml we have a very nice threading library, Lwt, coming with a very useful syntax extension. I wrote a prototype with a binding to some node library (a AWS S3 client) and added a lwt-ish layer to hide the callback.

open Lwt.Infix
open Printf
open Js

let require_module s =
    Js.Unsafe.fun_call
      (Js.Unsafe.js_expr "require")
      [|Js.Unsafe.inject (Js.string s)|]

let _js_aws = require_module "aws-sdk"

let array_to_list a =
  let ax = ref [] in
  begin
    for i = 0 to a##.length - 1 do
      Optdef.iter (array_get a i) (fun x -> ax := x :: !ax)
    done;
    !ax
  end


class type error = object
end

class type bucket = object
  method _Name : js_string t readonly_prop
  method _CreationDate : date t readonly_prop
end

class type listBucketsData = object
  method _Buckets : (bucket t) js_array t readonly_prop
end

class type s3 = object
  method listBuckets :
    (error -> listBucketsData t -> unit) callback -> unit meth
end

let createClient : unit -> s3 t = fun () ->
  let constr_s3 = _js_aws##.S3 in
  new%js constr_s3 ()


module S3 : sig
  type t
  val create : unit -> t
  val list_buckets : t -> (string * string) list Lwt.t
end = struct
  type t = s3 Js.t

  let create () =
    createClient ()

  let list_buckets client =
    let cell_of_bucket_data data =
      ((to_string data##._Name),
       (to_string data##._CreationDate##toString))
    in
    let mvar = Lwt_mvar.create_empty () in
    let callback error buckets =
      let p () =
        if true then
          Lwt_mvar.put mvar
            (`Ok(List.map cell_of_bucket_data @@ array_to_list buckets##._Buckets))
        else
          Lwt_mvar.put mvar (`Error("Ups"))
      in
      Lwt.async p
    in
    begin
      client##listBuckets (wrap_callback callback);
      Lwt.bind
        (Lwt_mvar.take mvar)
        (function
          | `Ok(whatever) -> Lwt.return whatever
          | `Error(mesg) -> Lwt.fail_with mesg)
    end
end

let () =
  let s3 = S3.create() in
  let dump lst =
    Lwt_list.iter_s
      (fun (name, creation_date) ->
         printf "%32s\t%s\n" name creation_date;
         Lwt.return_unit)
      lst
  in
  let t () =
    S3.list_buckets s3
    >>= dump
  in
  begin
    Lwt.async t
  end

Since there is no binding to Lwt_main for node.js, I had to run my code with Lwt.async. What are the differences between running the code with Lwt.async rather than with Lwt_main.run – the latter not existing in node.js? Is it guaranteed that the program will wait until the asynchronous threads are completed before exiting, or is this rather a lucky but random behaviour of my code?


Solution

  • The Lwt_main.run function recursively polls the thread whose execution it supervises. At each iteration, if this thread is still running, the scheduler uses one engine (from Lwt_engine) to execute threads waiting for I/Os, either by selecting or synchronising on events.

    The natural way to translate this in Node.JS is to use the process.nextTick method, which relies on Node.JS own scheduler. Implementing the Lwt_main.run function in this case can be as simple as:

    let next_tick (callback : unit -> unit) =
      Js.Unsafe.(fun_call
                   (js_expr "process.nextTick")
                   [| inject (Js.wrap_callback callback) |])
    
    let rec run t =
      Lwt.wakeup_paused ();
      match Lwt.poll t with
        | Some x -> x
        | None -> next_tick (fun () -> run t)
    

    This function only run threads of type unit Lwt.t but this is the main case for a program. It is possible to compute arbitrary values using a Lwt_mvar.t to communicate.

    It is also possible to extend this example to support all sort of hooks, as in the original Lwt_main.run implementation.