9

I'm trying to use the OBus library with Lwt_react. This uses "functional reactive programming" for properties and signals.

The problem (as noted in the React documentation) is that OCaml may garbage collect your callback while you're still using it. There's a keep function, which keeps the handler forever, but I don't want that. I do want to free it eventually, just not while I still need it.

So, I thought I'd attach the handler to a switch:

let keep ~switch handler =
  Lwt_switch.add_hook (Some switch) (fun () ->
    ignore handler;
    Lwt.return ()
  )

But my event handler gets garbage-collected anyway (which makes sense, since the code to turn off the switch is called when the signal arrives, so it's only the signal handler keeping the switch alive in the first place).

Here's a simplified (stand-alone) version of my code:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let setup () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let dont_gc_me = Lwt_react.E.map handler finished_event in
  ignore dont_gc_me;  (* What goes here? *)

  print_endline "Waiting for signal...";
  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  let finished = Lwt.protected (setup ()) in

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt_main.run finished;
  print_endline "Done";

Without the Gc.full_major line, this normally prints Done. With it, it just hangs at Waiting for signal....

Edit: I've split setup (the real code) from the test driver and added a Lwt.protected wrapper to avoid masking the problem by accident of Lwt's cancellations.

Thomas Leonard
  • 7,068
  • 2
  • 36
  • 40

4 Answers4

7

Here is a snippet taken from some project of mine, fixed to workaround this weak references issue (thx!). First part is to keep a global root pointing to your object. Second part is to delimit the liveness of a signal/event to the extent of a Lwt thread.

Please note that the reactive entity is cloned and explicitly stopped, which may not exactly match your expectations.

module Keep : sig 
  type t
  val this : 'a -> t
  val release : t -> unit
end = struct
  type t = {mutable prev: t; mutable next: t; mutable keep: (unit -> unit)}
  let rec root = {next = root; prev = root; keep = ignore}

  let release item =
    item.next.prev <- item.prev;
    item.prev.next <- item.next;
    item.prev <- item;
    item.next <- item;
    (* In case user-code keep a reference to item *)
    item.keep <- ignore

  let attach keep =
    let item = {next = root.next; prev = root; keep} in
    root.next.prev <- item;
    root.next <- item;
    item

  let this a = attach (fun () -> ignore a)
end

module React_utils : sig
  val with_signal : 'a signal -> ('a signal -> 'b Lwt.t) -> 'b Lwt.t
  val with_event  : 'a event -> ('a event -> 'b Lwt.t) -> 'b Lwt.t
end = struct
  let with_signal s f =
    let clone = S.map (fun x -> x) s in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> S.stop clone; Keep.release kept; Lwt.return_unit)
  let with_event e f =
    let clone = E.map (fun x -> x) e in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> E.stop clone; Keep.release kept; Lwt.return_unit)
end

Solving your example with this:

let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  (* We use [Lwt.async] because are not interested in knowing when exactly the reference will be released *)
  Lwt.async (fun () ->
    (React_utils.with_event (Lwt_react.E.map handler finished_event)
      (fun _dont_gc_me -> finished)));
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)
Def
  • 86
  • 2
  • I think this is solving the opposite problem. You want to stop receiving events once your function is done. I want to continue receiving events until it's done. I've updated the question now with some proper example code to demonstrate the problem. I can't see a way to fix it using your module. Thanks, – Thomas Leonard Nov 14 '13 at 11:29
  • I disagree. I want to receive events until a thread is done, and stop receiving them once it is done. As I understand the problem, the first part is exactly what you are aiming for. – Def Nov 14 '13 at 19:30
  • OK, that does work! But for a very non-obvious reason. I eventually figured it out by looking at the Lwt source code. The Lwt.bind call creates an extra reference from the final task back to "finished" because it's a "cancellable" task (cancelling the result of "run" will cancel "finished"). I worry this is a bit fragile, though. For example, adding `let finished = Lwt.protected finished in` before the `Gc` line breaks it again. – Thomas Leonard Nov 14 '13 at 23:06
  • You are right, it doesn't work but there must be a way to fix that… Why it seems to work at first is quite obvious for me: the lwt thread keeps a reference to the reactive graph, which therefore can't be garbage collected until the lwt thread created by Reactive_utils is finished. – Def Nov 14 '13 at 23:33
  • 2
    Why it doesn't work however is interesting and subtle imho… I think it's because all references kept to finished are weak: "waker" is referenced inside the React event, it's weak. "finished" is referenced by local scope, but after the Lwt.protected call, the original finished is no longer reachable from local scope. Since the two possible references to the finished thread are weak, they are garbage collected, as well as all the plumbing done by React_utils on those. What we would need would be a way to make a thread "strong", which means keeping a strong reference to it until it is determined – Def Nov 14 '13 at 23:48
1

Here's my current (hacky) workaround. Each handler gets added to a global hashtable, and then removed again when the switch is turned off:

let keep =
  let kept = Hashtbl.create 10 in
  let next = ref 0 in
  fun ~switch value ->
    let ticket = !next in
    incr next;
    Hashtbl.add kept ticket value;
    Lwt_switch.add_hook (Some switch) (fun () ->
      Hashtbl.remove kept ticket;
      Lwt.return ()
    )

It's used like this:

Lwt_react.E.map handler event |> keep ~switch;
Thomas Leonard
  • 7,068
  • 2
  • 36
  • 40
1

One easy way to deal with this is to keep a reference to your event and call React.E.stop when you don't want it anymore:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let ev = Lwt_react.E.map handler finished_event in
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  React.E.stop ev;

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  Lwt_main.run (run ());
  print_endline "Done";
dim
  • 31
  • 2
  • This is the fault of my poor test case (mixing the real code with the test driver code). I've replaced it with a better version. In the real code, you can't call `stop` right after `fire_finished` because `fire_finished` happens at the event source, not at the receiver. If you put the `stop` inside the `bind` function, you have the same problem as @Def's answer; it may get garbage collected. – Thomas Leonard Nov 15 '13 at 12:06
  • I see. Well this is expected then. – dim Nov 18 '13 at 10:18
0

Note that if lwt didn't supported cancellation, then you would observe the same behavior by replacing Lwt.protected (setup ()) by Lwt.bind (setup ()) Lwt.return.

Basically what you have is:

finished_event --weak--> SETUP --> finished

where SETUP is a cycle between an event and a Lwt thread. Removing the Lwt.protected just squashes the the last pointer, so it happens to do what you want.

Lwt has only forwards pointers (except to support cancel) and React has only backward pointers (the forward ones are weak). So the way to make this work properly is to return the event instead of the thread.

dim
  • 31
  • 2
  • OK, but if I return an event then I can't pass that to Lwt_main.run, or anything else that needs a task, right? (this is all happening in a small sub-routine of a larger program - specifically, 0install doing a PackageKit transaction) – Thomas Leonard Nov 18 '13 at 11:05