ocamlocaml-lwt

Ocaml Lwt.wait()


I have a question about lwt's wait function and how I would use it in my own custom function which would return a 'a Lwt.t thread. First let's show you an example.

open Lwt_io
open Lwt.Infix

let i, o = Lwt_io.pipe()

let get_int () =
  let t, w = Lwt.wait() in
  (*what do I do here to provide the sleeping thread*)
  (*with a possible integer reply*)
  Lwt_io.read_int(i) >>= fun i -> Lwt.wakeup w i;
  t

let ans = get_int()

In the above function I call wait to produce a sleeping thread plus its wakener but I'm unsure how to provide the sleeping thread with a possible integer reply and still be able to return a sleeping thread from the get_int function. I provided a line (Lwt_io.read_int(i) >>= fun i -> Lwt.wakeup w i;) which appears to work but I'm unsure if this the proper way to accomplish this. Any pointers or links or comments?

Note: I'm asking because adding Lwt_io.read_int(i) to the function is redundant. I could just eliminate the get_int function and just call Lwt_io.read_int(i) but I'm curious how you would do this without the redundancy.


Solution

  • First of all, let's switch to a new Lwt terminology. According to the new terminology, that was accepted in the Lwt library, the Lwt.wait function returns a promise and a resolver.

    This is a pretty low-level interface, that is usually used to implement more high-level interfaces. And indeed, in your case, the get_int function can be implemented as just Lwt_io.read_int.

    So, for the sake of an experiment let's implement something that makes slightly more sense:

    let wait () =
       let promise, resolver = Lwt.wait () in
       Lwt.async (fun () ->
          Lwt_unix.sleep (Random.float 1.0) >|= 
          Lwt.wakeup resolver);
       promise
    

    Our wait function returns a promise, that will be fulfilled after a random delay. You may see, that there is a call to Lwt.async that will take a thunk and execute it in the event handler asynchonously, thus a function wait returns immediately. Of course, this example doesn't make much more sense, as a function with the same semantics can be implemented as:

    let wait () = Lwt_unix.sleep 1.0
    

    But this just shows, that the wait function is needed only to implement Lwt primitives.

    One of the justified use of this interface is when you need to decouple a service provider and a service consumer. Then you can use Lwt.wait (or even better Lwt.add_task_*, e.g.,

    module Broker = sig 
      type 'a t
      val request : 'a t -> 'a Lwt.t
      val provide : 'a t -> 'a -> unit
    end = struct 
      type 'a t = {mutable requested : 'a Lwt.u option}
    
      let request broker = 
         let promise, request = Lwt.wait () in
         broker.requested <- Some request;
         promise
    
      let provide broker data = match broker.requested with
        | None -> ()
        | Some request -> Lwt.wakeup request data
    end
    

    Of course, we just reimplemented mailbox (that is yet another proof, that we don't usually need to go that low, as everything is already done for us), but in general, when you have multiple requests, and you want to implement your own scheduling of them, then you can use this low-level interface.