I am currently learning Lwt. I am interested into using asynchronous processes to replace some shell routines by OCaml routines.
Let us take a look at a simplified first attempt, where a filter is created by combining two threads running cat
:
let filter_cat ()=
Lwt_process.pmap_lines ("cat", [| "cat" |])
let filter_t () =
Lwt_io.stdin
|> Lwt_io.read_lines
|> filter_cat ()
|> filter_cat ()
|> Lwt_io.write_lines Lwt_io.stdout
let () =
filter_t ()
|> Lwt_main.run
This filter somehow works but hangs up when its standard input closes instead of exiting. If I remove one of the filter_cat
, it works as expected.
I am guessing that I do not compose these filters appropriately and therefore cannot join the two threads I am starting. What is the correct way to compose these filters, so that the program terminates after it reads EOF
on stdin
?
You can find this program together with a BSD Owl Makefile in a Github gist.
The answer to this, is that there is a little bug in Lwt. There is an internal function, monitor that which performs the piping:
(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)
let monitor sender st =
let sender = sender >|= fun () -> None in
let state = ref Init in
Lwt_stream.from
(fun () ->
match !state with
| Init ->
let getter = Lwt.apply Lwt_stream.get st in
let result _ =
match Lwt.state sender with
| Lwt.Sleep ->
(* The sender is still sleeping, behave as the
getter. *)
getter
| Lwt.Return _ ->
(* The sender terminated successfully, we are
done monitoring it. *)
state := Done;
getter
| Lwt.Fail _ ->
(* The sender failed, behave as the sender for
this element and save current getter. *)
state := Save getter;
sender
in
Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
| Save t ->
state := Done;
t
| Done ->
Lwt_stream.get st)
The problem is in the definition
let getter = Lwt.apply Lwt_stream.get st
When the getter
process meets the end of the stream, then it is saved, but the sender
is lost, which seems to prevent completion. This can be fixed by improving the definition of getter
by telling it to behave as the sender
when the end of the stream has been reached.