···7878sub subscription_start_seq ($c, %args) {
7979 my $cursor_param = $args{cursor_param};
8080 my $latest = $args{latest} // $c->store->latest_event_seq;
8181- my $oldest = $args{oldest} // $c->store->oldest_event_seq;
8281 my $future_limit = $args{future_limit} // $latest;
8382 my $future_message = $args{future_message} // 'Cursor is ahead of the local event stream';
8483 my $outdated_message = $args{outdated_message} // 'Cursor predates the oldest locally retained event';
8484+ my $backfill_window = $args{backfill_window_seconds}
8585+ // $c->config_value('subscription_backfill_window_seconds', 3600);
8686+ my $backfill_cutoff = $args{backfill_cutoff} // (time - $backfill_window);
85878688 if (!defined $cursor_param || $cursor_param eq q()) {
8789 return $latest + 1;
···9799 return undef;
98100 }
99101100100- if ($oldest && $cursor && $cursor < $oldest) {
102102+ my $next_event = $args{next_event} // $c->store->next_event_after_seq($cursor);
103103+ if ($next_event && ($next_event->{created_at} // 0) < $backfill_cutoff) {
101104 $c->subscription_send(
102105 binary => encode_info_frame('OutdatedCursor', $outdated_message),
103106 frame_type => 'info',
104107 );
105105- return $oldest;
108108+ my $resume_seq = $args{backfill_start} // $c->store->earliest_event_seq_after_time($backfill_cutoff);
109109+ return defined($resume_seq) ? $resume_seq : ($latest + 1);
106110 }
107111108112 return $cursor + 1;
+19-4
lib/ATProto/PDS/Store/SQLite.pm
···974974 });
975975}
976976977977+sub next_event_after_seq ($self, $cursor) {
978978+ return observe_store_operation($self->{metrics}, 'next_event_after_seq', sub {
979979+ return _row_from_blob_columns(_row_from_json_columns(
980980+ $self->dbh->selectrow_hashref(
981981+ q{SELECT * FROM events WHERE seq > ? ORDER BY seq LIMIT 1},
982982+ undef,
983983+ $cursor // 0,
984984+ ),
985985+ qw(payload_json),
986986+ ), qw(car_bytes));
987987+ });
988988+}
989989+977990sub latest_event_seq ($self) {
978991 return observe_store_operation($self->{metrics}, 'latest_event_seq', sub {
979992 return $self->dbh->selectrow_array(
···982995 });
983996}
984997985985-sub oldest_event_seq ($self) {
986986- return observe_store_operation($self->{metrics}, 'oldest_event_seq', sub {
998998+sub earliest_event_seq_after_time ($self, $created_at) {
999999+ return observe_store_operation($self->{metrics}, 'earliest_event_seq_after_time', sub {
9871000 my $value = $self->dbh->selectrow_array(
988988- q{SELECT MIN(seq) FROM events},
10011001+ q{SELECT MIN(seq) FROM events WHERE created_at >= ?},
10021002+ undef,
10031003+ $created_at,
9891004 );
990990- return defined $value ? $value : 0;
10051005+ return $value;
9911006 });
9921007}
9931008
+5-4
t/firehose.t
···533533$skip_unknown->finish_ok;
534534535535my $outdated_floor = $app->store->latest_event_seq;
536536-$app->store->dbh->do(q{DELETE FROM events WHERE seq <= ?}, undef, $outdated_floor);
536536+my $stale_event_time = time - 3660;
537537+$app->store->dbh->do(q{UPDATE events SET created_at = ? WHERE seq <= ?}, undef, $stale_event_time, $outdated_floor);
537538538539$t->post_ok('/xrpc/com.atproto.repo.createRecord' => {
539540 Authorization => "Bearer $access",
···557558is($outdated_info->{header}{t}, '#info', 'stale repo cursor yields an info frame');
558559is($outdated_info->{body}{name}, 'OutdatedCursor', 'stale repo cursor is reported as OutdatedCursor');
559560560560-$outdated->message_ok('stale repo cursor then resumes from the oldest retained event');
561561+$outdated->message_ok('stale repo cursor then resumes from the earliest event still inside the backfill window');
561562my $outdated_commit = decode_frame($outdated->message->[1]);
562562-is($outdated_commit->{header}{t}, '#commit', 'repo stream resumes with the retained commit event');
563563-is($outdated_commit->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-fourth', 'repo replay resumes at the retained commit');
563563+is($outdated_commit->{header}{t}, '#commit', 'repo stream resumes with the first event still inside the backfill window');
564564+is($outdated_commit->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-fourth', 'repo replay resumes at the first in-window commit');
564565$outdated->finish_ok;
565566566567done_testing;
+5-4
t/labels.t
···359359ok($repo_neg_label, 'repo query includes the negated repo label itself');
360360is($repo_neg_label->{src}, $service_did, 'negated repo label keeps the local source');
361361362362-$app->store->dbh->do(q{DELETE FROM events WHERE seq <= ?}, undef, $app->store->latest_event_seq);
362362+my $stale_label_time = time - 3660;
363363+$app->store->dbh->do(q{UPDATE events SET created_at = ? WHERE seq <= ?}, undef, $stale_label_time, $app->store->latest_event_seq);
363364364365$t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => {
365366 Authorization => $admin_auth,
···376377is($outdated_info->{header}{t}, '#info', 'stale label cursor yields an info frame');
377378is($outdated_info->{body}{name}, 'OutdatedCursor', 'stale label cursor is reported as OutdatedCursor');
378379379379-$outdated->message_ok('stale label cursor then resumes from the oldest retained label event');
380380+$outdated->message_ok('stale label cursor then resumes from the earliest in-window label event');
380381my $outdated_label = decode_frame($outdated->message->[1]);
381382is($outdated_label->{header}{t}, '#labels', 'label stream resumes with a labels frame');
382382-is($outdated_label->{body}{labels}[0]{uri}, "at://$did", 'stale label replay resumes at the retained label event');
383383-is($outdated_label->{body}{labels}[0]{val}, '!hide', 'retained label replay carries the expected moderation label');
383383+is($outdated_label->{body}{labels}[0]{uri}, "at://$did", 'stale label replay resumes at the first in-window label event');
384384+is($outdated_label->{body}{labels}[0]{val}, '!hide', 'first in-window label replay carries the expected moderation label');
384385$outdated->finish_ok;
385386386387$ws->finish_ok;