I'm trying to write a function that tries to evaluate a function, but stops after a specific timeout.
I tried to use Deferred.any
, which returns a deferred that is fulfilled when one of the underlying deferred is fulfilled.
type 'a output = OK of 'a | Exn of exn
let fun_test msg f eq (inp,ans) =
let outp = wait_for (Deferred.any
[ return (try OK (f inp) with e -> Exn e)
; (after (Core.Std.sec 0.0) >>| (fun () -> Exn TIMEOUT))])
in {msg = msg;inp = inp;outp = outp;ans = ans;pass = eq outp ans}
I was not sure how to extract a value from the deferred monad, so I wrote a function 'wait_for' which just spins until the underlying value is determined.
let rec wait_for x =
match Deferred.peek x with
| None -> wait_for x
| Some done -> done;;
This did not work. After reading through the Async chapter of Real World OCaml, I realized I needed to start the scheduler. However I'm not sure where I would call Schedule.go
in my code. I do not see where the type go : ?raise_unhandled_exn:bool -> unit -> Core.Std.never_returns
would fit into code where you actually want your asynchronous code to return. The documentation for go
says "Async programs do not exit until shutdown
is called."
I was beginning to doubt I had taken the entirely wrong approach to the problem until I found a very similar solution to that same problem on this Cornell website
let timeout (thunk:unit -> 'a Deferred.t) (n:float) : ('a option) Deferred.t
= Deferred.any
[ after (sec n) >>| (fun () -> None) ;
thunk () >>= (fun x -> Some x) ]
Anyway, I'm not quite sure my use of wait_for
is correct. Is there a canonical way to extract a value from the deferred monad? Also how do I start the scheduler?
Update:
I tried writing a timeout function using only Core.Std.Thread
and Core.Std.Mutex
.
let rec wait_for lck ptr =
Core.Std.Thread.delay 0.25;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> Core.Std.Mutex.unlock lck; wait_for lck ptr
| Some x -> Core.Std.Mutex.unlock lck; x);;
let timeout t f =
let lck = Core.Std.Mutex.create () in
let ptr = ref None in
let _ = Core.Std.Thread.create
(fun () -> Core.Std.Thread.delay t;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some (Exn TIMEOUT)
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
let _ = Core.Std.Thread.create
(fun () -> let x = f () in
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some x
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
wait_for lck ptr
I think this is pretty close to working. It works on computations like let rec loop x = print_string ".\n"; loop x
, but it does not work on computations like let rec loop x = loop x
. I believe the problem right now is that if the computation f ()
loops infinitely, then its thread is never preempted, so none of other threads can notice the timeout has expired. If the thread does IO like printing a string, then the thread does get preempted. Also I don't know how to kill a thread, I couldn't find such a function in the documentation for Core.Std.Thread
The solution I came up with is
let kill pid sign =
try Unix.kill pid sign with
| Unix.Unix_error (e,f,p) -> debug_print ((Unix.error_message e)^"|"^f^"|"^p)
| e -> raise e;;
let timeout f arg time default =
let pipe_r,pipe_w = Unix.pipe () in
(match Unix.fork () with
| 0 -> let x = Some (f arg) in
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc x [];
close_out oc;
exit 0
| pid0 ->
(match Unix.fork () with
| 0 -> Unix.sleep time;
kill pid0 Sys.sigkill;
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc default [];
close_out oc;
exit 0
| pid1 -> let ic = Unix.in_channel_of_descr pipe_r in
let result = (Marshal.from_channel ic : 'b option) in
result ));;
I think I might be creating two zombie processes with this though. But it is the only solution that works on let rec loop x = loop x
when compiled using ocamlopt
(The solution using Unix.alarm
given here works when compiled with ocamlc
but not when compiled with ocamlopt
).