objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Attempt at fixing deadlock when subscribeRepos consumer disconnects

futurGH b0056cb4 f2f31f70

+26 -3
+26 -3
pegasus/lib/api/sync/subscribeRepos.ml
··· 8 8 | None -> 9 9 0 10 10 in 11 + let closed = ref false in 11 12 let send (bytes : bytes) = 12 - Dream.send ~text_or_binary:`Binary ws (Bytes.unsafe_to_string bytes) 13 + if !closed then Lwt.fail Exit 14 + else 15 + Lwt.catch 16 + (fun () -> 17 + Lwt_unix.with_timeout 30.0 (fun () -> 18 + Dream.send ~text_or_binary:`Binary ws 19 + (Bytes.unsafe_to_string bytes) ) ) 20 + (fun _ -> 21 + closed := true ; 22 + Lwt.fail Exit ) 23 + in 24 + let stream () = 25 + Sequencer.Live.stream_with_backfill ~conn:db ~cursor ~send 26 + in 27 + let wait_for_close () = 28 + let rec loop () = 29 + match%lwt Dream.receive ws with 30 + | Some _ -> 31 + loop () 32 + | None -> 33 + closed := true ; 34 + Lwt.fail Exit 35 + in 36 + loop () 13 37 in 14 38 Lwt.catch 15 - (fun () -> 16 - Sequencer.Live.stream_with_backfill ~conn:db ~cursor ~send ) 39 + (fun () -> Lwt.pick [stream (); wait_for_close ()]) 17 40 (fun _exn -> Lwt.return_unit) ) )