1

Here I have a function which accept a TCP connection and run two Lwt threads handle_connection and send_message. Each time that a connection is terminated I am notified in the handle_connection thread, so I can terminate it loops, but then I want to terminate the whole join <&> thread to go further with another connection in the next serv recursive call.

let create_server sock =
  let rec serve () =
    Lwt_unix.accept sock
    >>= (fun (fd, _) ->
          connection := true;
          let ic = Lwt_io.of_fd ~mode:Lwt_io.Input fd in
          let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in
          handle_connection ic oc <&> send_message oc)
    >>= serve
  in
  serve ()

The question is, how can I force the send_message thread to terminate each time that handle_connection terminates ?

let handle_connection ic oc =
  Lwt.on_failure (handle_message ic oc "client") (fun e ->
      Logs.err (fun m -> m "%s" (Printexc.to_string e)));
  Logs_lwt.info (fun m -> m "New connection")


let rec send_message oc =
  let* s = read_console () in
  Lwt_io.write_line oc s >>= fun _ -> send_message oc)

I have already tried to use Lwt.choose instead of Lwt.join, it passes to the next connection when client is disconnected but the send_message thread is still running on the terminated connection.

  • 1
    Incidentally, I'm not familiar with `Lwt`, but if `>>=` and `return` are the usual monad operators, `>>= return` should be useless, by monad laws. – jthulhu Nov 06 '22 at 20:39
  • @BlackBeans I use that to return a value of the Lwt monad type. Lwt.return : 'a -> 'a Lwt.t creates a promise which is already fulfilled with the given value – Edwin Ansari Nov 06 '22 at 21:33
  • Have you tried simply removing it? `(>>=) : 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b`, so it means you already had a promise (it should still typecheck). – jthulhu Nov 07 '22 at 06:31
  • That's write thank you, I will update the expression. – Edwin Ansari Nov 09 '22 at 15:38

1 Answers1

1

I hesitate to comment on this as I suspect that you already know this, but as a matter of general principle the most fundamental way in which you conditionally wait on a promise in Lwt is to construct the promise using Lwt.wait, and then bind on the promise using operator let* or operator >>= until the promise's resolver either fulfils the promise by means of Lwt.wakeup_later or (in your special case) rejects it using Lwt.wakeup_later_exn. Alternatively in the latter case you could construct the promise using Lwt.task and reject the promise directly by cancellation with Lwt.cancel, but I believe cancellation is now deprecated or at least discouraged.

There is a Lwt.pick function which, on a promise being fulfilled, will cancel any others which are bound by the pick, but that is the inverse of what you want. What this means is that I think you are going to have to restructure your code to expose the conditional promise.

Chris Vine
  • 677
  • 3
  • 7
  • Yes, I've already tried to use `task` and `pick` but my problem is that I want a running promise until it is cancelled by some event (in another function), and then it runs again with some other event. With task, I can only `cancel` or `wakeup` it once if I understood the concept well. Maybe you can show me how can I achieve this with a piece of code, please ? – Edwin Ansari Nov 09 '22 at 15:33
  • Yes, you can only fulfil or reject any one promise once. To do what you want you will have to apply `Lwt.wait` or `Lwt.task` on each iteration. – Chris Vine Nov 10 '22 at 11:01
  • On a different point, send_message is waiting for the promise returned by read_console to be resolved. You don't give the implementation of the latter function, but presumably that is the promise that you need to reject or cancel. – Chris Vine Nov 10 '22 at 11:05
  • Note also that a "thread" in Lwt is just a series of dependent promises: it is not like an OS thread. By rejecting or canceling the promise whose continuation is currently bound by operator `let*` or operator `>>=`, that "thread" goes away. – Chris Vine Nov 10 '22 at 11:12
  • If instead of having a global `task`, I create a `task` on each iteration in one function, then I can not give it to another function to cancel it later. How can we resolve this problem ? – Edwin Ansari Nov 10 '22 at 21:42
  • You could use a global reference. But could you not pass the promise or its resolver as a function argument? – Chris Vine Nov 11 '22 at 17:41
  • The functions don't call each other, so I can not pass the promise as an argument ! – Edwin Ansari Nov 11 '22 at 19:40
  • I still feel that I haven't fully grasped your use case. You should treat cancelling a promise representing a chain of dependent promises, or rejecting it via its resolver, in a similar way to the way you would if applying pthread_cancel to the thread ID of a native OS thread. If that requires a global thread ID (or here a global reference) so be it: but I suspect your code could be restructured to avoid that. You would of course have to handle the rejection, if necessary by doing nothing, but that is straightforward with `Lwt.catch` or the `try%lwt` macro. – Chris Vine Nov 12 '22 at 10:59