objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Cut off slow subscribeRepos consumers

futurGH b96419d1 5d180b39

+76 -29
+76 -29
pegasus/lib/sequencer.ml
··· 451 451 452 452 let ring_size = 2048 453 453 454 + let queue_max = 1000 455 + 454 456 let ring : item array = Array.make ring_size {seq= 0; bytes= Bytes.empty} 455 457 456 458 let head_seq = ref 0 ··· 458 460 let count = ref 0 459 461 460 462 type subscriber = 461 - {id: int; q: item Queue.t; cond: unit Lwt_condition.t; mutable closed: bool} 463 + { id: int 464 + ; q: item Queue.t 465 + ; cond: unit Lwt_condition.t 466 + ; mutable closed: bool 467 + ; mutable close_reason: string option } 462 468 463 469 let subs : (int, subscriber) Hashtbl.t = Hashtbl.create 64 464 470 ··· 475 481 (fun _ s -> 476 482 if not s.closed then ( 477 483 Queue.push it s.q ; 478 - Lwt_condition.broadcast s.cond () ) ) 484 + if Queue.length s.q > queue_max then ( 485 + s.closed <- true ; 486 + s.close_reason <- Some "ConsumerTooSlow" ; 487 + Hashtbl.remove subs s.id ; 488 + Lwt_condition.broadcast s.cond () ) 489 + else Lwt_condition.broadcast s.cond () ) ) 479 490 subs ; 480 491 Lwt.return_unit ) 481 492 ··· 489 500 { id= !id 490 501 ; q= Queue.create () 491 502 ; cond= Lwt_condition.create () 492 - ; closed= false } 503 + ; closed= false 504 + ; close_reason= None } 493 505 in 494 506 Hashtbl.add subs !id s ; Lwt.return s ) 495 507 ··· 562 574 let stream_with_backfill ~(conn : Data_store.t) ~(cursor : int) 563 575 ~(send : bytes -> unit Lwt.t) : unit Lwt.t = 564 576 let%lwt sub = Bus.subscribe () in 577 + let send_consumer_too_slow () = 578 + let err = 579 + { error= "ConsumerTooSlow" 580 + ; message= 581 + Some 582 + "you're not consuming messages fast enough! maybe \ 583 + com.atproto.sync.getRepo is more your speed?" } 584 + in 585 + send (Frame.encode_error err) 586 + in 565 587 Lwt.finalize 566 588 (fun () -> 567 589 let%lwt head_db = DB.latest_seq conn in ··· 592 614 (Frame.encode_message ~seq:ev.seq ~time:ev.time payload) ) 593 615 events ) 594 616 >>= fun () -> 595 - (* live tail *) 596 - let rec loop last = 597 - let%lwt it = Bus.wait_next sub in 598 - if it.seq <= last then loop last 599 - else if it.seq > last + 1 then 600 - let%lwt gap = 601 - DB.request_seq_range ~earliest_seq:last ~latest_seq:it.seq 602 - ~limit:1000 conn 603 - in 604 - let%lwt () = 605 - Lwt_list.iter_s 606 - (fun ev -> 607 - if ev.seq <= last then Lwt.return_unit 608 - else 609 - send 610 - ( match ev.kind with 611 - | Message (m, _) -> 612 - Frame.encode_message ~seq:ev.seq ~time:ev.time m 613 - | Error e -> 614 - Frame.encode_error e ) ) 615 - gap 616 - in 617 - send it.bytes >>= fun () -> loop it.seq 618 - else send it.bytes >>= fun () -> loop it.seq 619 - in 620 - loop cutoff ) 617 + (* bail if consumer too slow *) 618 + if sub.Bus.closed then 619 + match sub.Bus.close_reason with 620 + | Some "ConsumerTooSlow" -> 621 + send_consumer_too_slow () 622 + | _ -> 623 + Lwt.return_unit 624 + else 625 + (* live tail *) 626 + let rec loop last = 627 + if sub.Bus.closed then 628 + match sub.Bus.close_reason with 629 + | Some "ConsumerTooSlow" -> 630 + send_consumer_too_slow () 631 + | _ -> 632 + Lwt.return_unit 633 + else 634 + Lwt.catch 635 + (fun () -> 636 + let%lwt it = Bus.wait_next sub in 637 + if it.seq <= last then loop last 638 + else if it.seq > last + 1 then 639 + let%lwt gap = 640 + DB.request_seq_range ~earliest_seq:last ~latest_seq:it.seq 641 + ~limit:1000 conn 642 + in 643 + let%lwt () = 644 + Lwt_list.iter_s 645 + (fun ev -> 646 + if ev.seq <= last then Lwt.return_unit 647 + else 648 + send 649 + ( match ev.kind with 650 + | Message (m, _) -> 651 + Frame.encode_message ~seq:ev.seq ~time:ev.time 652 + m 653 + | Error e -> 654 + Frame.encode_error e ) ) 655 + gap 656 + in 657 + send it.bytes >>= fun () -> loop it.seq 658 + else send it.bytes >>= fun () -> loop it.seq ) 659 + (fun _exn -> 660 + (* check if any failure was due to slow consumer *) 661 + match sub.Bus.close_reason with 662 + | Some "ConsumerTooSlow" -> 663 + send_consumer_too_slow () 664 + | _ -> 665 + Lwt.return_unit ) 666 + in 667 + loop cutoff ) 621 668 (fun () -> Bus.unsubscribe sub) 622 669 end 623 670