unit-testingasynchronousfunctional-programmingocamlocaml-lwt

OCaml writing a timeout function using Async


I'm trying to write a function that tries to evaluate a function, but stops after a specific timeout.

I tried to use Deferred.any, which returns a deferred that is fulfilled when one of the underlying deferred is fulfilled.

type 'a output = OK of 'a | Exn of exn

let fun_test msg f eq (inp,ans) =
   let outp = wait_for (Deferred.any
     [ return (try OK (f inp) with e -> Exn e)
     ; (after (Core.Std.sec 0.0) >>| (fun () -> Exn TIMEOUT))])
   in {msg = msg;inp = inp;outp = outp;ans = ans;pass = eq outp ans}

I was not sure how to extract a value from the deferred monad, so I wrote a function 'wait_for' which just spins until the underlying value is determined.

let rec wait_for x =
   match Deferred.peek x with
      | None -> wait_for x
      | Some done -> done;;

This did not work. After reading through the Async chapter of Real World OCaml, I realized I needed to start the scheduler. However I'm not sure where I would call Schedule.go in my code. I do not see where the type go : ?raise_unhandled_exn:bool -> unit -> Core.Std.never_returns would fit into code where you actually want your asynchronous code to return. The documentation for go says "Async programs do not exit until shutdown is called."

I was beginning to doubt I had taken the entirely wrong approach to the problem until I found a very similar solution to that same problem on this Cornell website

let timeout (thunk:unit -> 'a Deferred.t) (n:float) : ('a option) Deferred.t
  = Deferred.any 
    [ after (sec n) >>| (fun () -> None) ; 
      thunk () >>= (fun x -> Some x) ]

Anyway, I'm not quite sure my use of wait_for is correct. Is there a canonical way to extract a value from the deferred monad? Also how do I start the scheduler?

Update: I tried writing a timeout function using only Core.Std.Thread and Core.Std.Mutex.

  let rec wait_for lck ptr =
    Core.Std.Thread.delay 0.25;
    Core.Std.Mutex.lock lck;
    (match !ptr with
     | None -> Core.Std.Mutex.unlock lck; wait_for lck ptr
     | Some x -> Core.Std.Mutex.unlock lck; x);;

  let timeout t f =
    let lck = Core.Std.Mutex.create () in
    let ptr = ref None in
    let _ = Core.Std.Thread.create
      (fun () -> Core.Std.Thread.delay t;
                 Core.Std.Mutex.lock lck;
                 (match !ptr with
                  | None -> ptr := Some (Exn TIMEOUT)
                  | Some _ -> ());
                 Core.Std.Mutex.unlock lck;) () in
    let _ = Core.Std.Thread.create
      (fun () -> let x = f () in
                 Core.Std.Mutex.lock lck;
                 (match !ptr with
                  | None -> ptr := Some x
                  | Some _ -> ());
                 Core.Std.Mutex.unlock lck;) () in
    wait_for lck ptr

I think this is pretty close to working. It works on computations like let rec loop x = print_string ".\n"; loop x, but it does not work on computations like let rec loop x = loop x. I believe the problem right now is that if the computation f () loops infinitely, then its thread is never preempted, so none of other threads can notice the timeout has expired. If the thread does IO like printing a string, then the thread does get preempted. Also I don't know how to kill a thread, I couldn't find such a function in the documentation for Core.Std.Thread


Solution

  • The solution I came up with is

    let kill pid sign = 
      try Unix.kill pid sign with
      | Unix.Unix_error (e,f,p) -> debug_print ((Unix.error_message e)^"|"^f^"|"^p)
      | e -> raise e;;
    
    
    let timeout f arg time default = 
      let pipe_r,pipe_w = Unix.pipe () in
      (match Unix.fork () with
       | 0 -> let x = Some (f arg) in
              let oc = Unix.out_channel_of_descr pipe_w in
              Marshal.to_channel oc x [];
              close_out oc;
              exit 0
       | pid0 -> 
          (match Unix.fork () with
           | 0 -> Unix.sleep time;
                  kill pid0 Sys.sigkill;
                  let oc = Unix.out_channel_of_descr pipe_w in
                  Marshal.to_channel oc default [];
                  close_out oc;
                  exit 0
           | pid1 -> let ic = Unix.in_channel_of_descr pipe_r in
                     let result = (Marshal.from_channel ic : 'b option) in
                     result ));;
    

    I think I might be creating two zombie processes with this though. But it is the only solution that works on let rec loop x = loop x when compiled using ocamlopt (The solution using Unix.alarm given here works when compiled with ocamlc but not when compiled with ocamlopt).