3

I am currently learning Lwt. I am interested into using asynchronous processes to replace some shell routines by OCaml routines.

Let us take a look at a simplified first attempt, where a filter is created by combining two threads running cat:

let filter_cat ()=
  Lwt_process.pmap_lines ("cat", [| "cat" |])

let filter_t () =
  Lwt_io.stdin
  |> Lwt_io.read_lines
  |> filter_cat ()
  |> filter_cat ()
  |> Lwt_io.write_lines Lwt_io.stdout

let () =
  filter_t ()
  |> Lwt_main.run

This filter somehow works but hangs up when its standard input closes instead of exiting. If I remove one of the filter_cat, it works as expected.

I am guessing that I do not compose these filters appropriately and therefore cannot join the two threads I am starting. What is the correct way to compose these filters, so that the program terminates after it reads EOF on stdin?


You can find this program together with a BSD Owl Makefile in a Github gist.

Michaël Le Barbier
  • 6,103
  • 5
  • 28
  • 57
  • No idea why it's not working; sorry I can't help about that, maybe try the ocsigen mailing-list. On the stylistic side, I find the definition of `filter_cat` very confusing. Writing `let filter_cat strm = ...` and then `|> filter_cat` would be clearer. – Martin Jambon Mar 15 '15 at 19:21
  • 1
    Also, `Lwt_main.run (filter_t ())` would be a lot clearer than `filter_t () |> Lwt_main.run` because `Lwt_main.run` is not a filter by any reasonable definition of the term. – Martin Jambon Mar 15 '15 at 19:23
  • @MartinJambon Hi Martin, thanks for your hints and tips! I will ask on the mailing list. – Michaël Le Barbier Mar 15 '15 at 22:18
  • @MartinJambon For the record I opened an [issue describing that behaviour](https://github.com/ocsigen/lwt/issues/137). – Michaël Le Barbier Apr 27 '15 at 15:10

1 Answers1

1

The answer to this, is that there is a little bug in Lwt. There is an internal function, monitor that which performs the piping:

(* Monitor the thread [sender] in the stream [st] so write errors are
   reported. *)
let monitor sender st =
  let sender = sender >|= fun () -> None in
  let state = ref Init in
  Lwt_stream.from
    (fun () ->
       match !state with
         | Init ->
             let getter = Lwt.apply Lwt_stream.get st in
             let result _ =
               match Lwt.state sender with
                 | Lwt.Sleep ->
                     (* The sender is still sleeping, behave as the
                        getter. *)
                     getter
                 | Lwt.Return _ ->
                     (* The sender terminated successfully, we are
                        done monitoring it. *)
                     state := Done;
                     getter
                 | Lwt.Fail _ ->
                     (* The sender failed, behave as the sender for
                        this element and save current getter. *)
                     state := Save getter;
                     sender
             in
             Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
         | Save t ->
             state := Done;
             t
         | Done ->
             Lwt_stream.get st)

The problem is in the definition

let getter = Lwt.apply Lwt_stream.get st

When the getter process meets the end of the stream, then it is saved, but the sender is lost, which seems to prevent completion. This can be fixed by improving the definition of getter by telling it to behave as the sender when the end of the stream has been reached.

Michaël Le Barbier
  • 6,103
  • 5
  • 28
  • 57