perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Batch local appview index record loading

alice 635a4cc9 0948d93d

+153 -48
+38 -30
lib/ATProto/PDS/ServiceProxy/Threads.pm
··· 226 226 # Local appview reads can hit this repeatedly across requests, so keep the 227 227 # expensive scan isolated behind an event-seq keyed cache. 228 228 sub _build_local_post_index ($self, $c) { 229 + my @collections = qw( 230 + app.bsky.feed.post 231 + app.bsky.feed.like 232 + app.bsky.feed.repost 233 + ); 234 + my $rows = $c->store->list_records_by_collections(\@collections); 235 + my %did_seen = map { $_->{did} => 1 } grep { defined $_->{did} && length $_->{did} } @$rows; 236 + my %accounts_by_did = map { $_->{did} => $_ } 237 + @{ $c->store->get_accounts_by_dids([ sort keys %did_seen ]) }; 229 238 my $index = { 230 239 replies => {}, 231 240 stats => {}, 232 241 viewer => {}, 233 242 }; 234 243 235 - for my $account (@{ $c->store->list_accounts }) { 236 - for my $row (@{ $c->store->all_records_for_did($account->{did}) }) { 237 - my $value = $row->{value}; 238 - next unless ref($value) eq 'HASH'; 244 + for my $row (@$rows) { 245 + my $account = $accounts_by_did{ $row->{did} } or next; 246 + my $value = $row->{value}; 247 + next unless ref($value) eq 'HASH'; 239 248 240 - if (($row->{collection} // q()) eq 'app.bsky.feed.post') { 241 - my $reply = $value->{reply}; 242 - if (ref($reply) eq 'HASH') { 243 - my $parent_uri = $reply->{parent}{uri} // q(); 244 - if (length $parent_uri) { 245 - push @{ $index->{replies}{$parent_uri} }, [ $account, $row ]; 246 - _local_post_stats($index, $parent_uri)->{replyCount}++; 247 - } 249 + if (($row->{collection} // q()) eq 'app.bsky.feed.post') { 250 + my $reply = $value->{reply}; 251 + if (ref($reply) eq 'HASH') { 252 + my $parent_uri = $reply->{parent}{uri} // q(); 253 + if (length $parent_uri) { 254 + push @{ $index->{replies}{$parent_uri} }, [ $account, $row ]; 255 + _local_post_stats($index, $parent_uri)->{replyCount}++; 248 256 } 249 - 250 - my $quoted_uri = $self->_quoted_uri($value) // q(); 251 - _local_post_stats($index, $quoted_uri)->{quoteCount}++ 252 - if length $quoted_uri; 253 - next; 254 257 } 255 258 256 - if (($row->{collection} // q()) eq 'app.bsky.feed.like') { 257 - my $subject_uri = $value->{subject}{uri} // q(); 258 - next unless length $subject_uri; 259 - _local_post_stats($index, $subject_uri)->{likeCount}++; 260 - $index->{viewer}{$subject_uri}{like}{$account->{did}} = $self->_post_uri($account, $row); 261 - next; 262 - } 259 + my $quoted_uri = $self->_quoted_uri($value) // q(); 260 + _local_post_stats($index, $quoted_uri)->{quoteCount}++ 261 + if length $quoted_uri; 262 + next; 263 + } 263 264 264 - if (($row->{collection} // q()) eq 'app.bsky.feed.repost') { 265 - my $subject_uri = $value->{subject}{uri} // q(); 266 - next unless length $subject_uri; 267 - _local_post_stats($index, $subject_uri)->{repostCount}++; 268 - $index->{viewer}{$subject_uri}{repost}{$account->{did}} = $self->_post_uri($account, $row); 269 - } 265 + if (($row->{collection} // q()) eq 'app.bsky.feed.like') { 266 + my $subject_uri = $value->{subject}{uri} // q(); 267 + next unless length $subject_uri; 268 + _local_post_stats($index, $subject_uri)->{likeCount}++; 269 + $index->{viewer}{$subject_uri}{like}{$account->{did}} = $self->_post_uri($account, $row); 270 + next; 271 + } 272 + 273 + if (($row->{collection} // q()) eq 'app.bsky.feed.repost') { 274 + my $subject_uri = $value->{subject}{uri} // q(); 275 + next unless length $subject_uri; 276 + _local_post_stats($index, $subject_uri)->{repostCount}++; 277 + $index->{viewer}{$subject_uri}{repost}{$account->{did}} = $self->_post_uri($account, $row); 270 278 } 271 279 } 272 280
+16
lib/ATProto/PDS/Store/SQLite.pm
··· 783 783 return [ map { _row_to_record($_) } @$rows ]; 784 784 } 785 785 786 + sub list_records_by_collections ($self, $collections) { 787 + return observe_store_operation($self->{metrics}, 'list_records_by_collections', sub { 788 + my @collections = grep { defined($_) && length($_) } @{ $collections // [] }; 789 + return [] unless @collections; 790 + 791 + my $placeholders = join(', ', ('?') x @collections); 792 + my $rows = $self->dbh->selectall_arrayref( 793 + "SELECT * FROM records WHERE collection IN ($placeholders) ORDER BY did, collection, rkey", 794 + { Slice => {} }, 795 + @collections, 796 + ); 797 + return [ map { _row_to_record($_) } @$rows ]; 798 + }); 799 + } 800 + 786 801 sub list_collections_for_did ($self, $did) { 787 802 my $rows = $self->dbh->selectall_arrayref( 788 803 q{SELECT DISTINCT collection FROM records WHERE did = ? ORDER BY collection}, ··· 1217 1232 ) 1218 1233 }, 1219 1234 q{CREATE INDEX IF NOT EXISTS records_by_collection ON records(did, collection, rkey)}, 1235 + q{CREATE INDEX IF NOT EXISTS records_by_collection_did_rkey ON records(collection, did, rkey)}, 1220 1236 q{ 1221 1237 CREATE TABLE IF NOT EXISTS blocks ( 1222 1238 cid TEXT PRIMARY KEY,
+51 -18
t/service-proxy-local.t
··· 42 42 return $self->{list_accounts} // []; 43 43 } 44 44 45 + sub get_accounts_by_dids { 46 + my ($self, $dids) = @_; 47 + $self->{get_accounts_by_dids_calls}++; 48 + $self->{get_accounts_by_dids_args} = [ @$dids ]; 49 + return [ 50 + map { $self->{accounts_by_did}{$_} } 51 + grep { defined $self->{accounts_by_did}{$_} } @$dids 52 + ]; 53 + } 54 + 45 55 sub all_records_for_did { 46 56 my ($self, $did) = @_; 47 57 $self->{all_records_for_did_calls}{$did}++; 48 58 return $self->{all_records_for_did}{$did} // []; 59 + } 60 + 61 + sub list_records_by_collections { 62 + my ($self, $collections) = @_; 63 + $self->{list_records_by_collections_calls}++; 64 + $self->{list_records_by_collections_args} = [ @$collections ]; 65 + my %wanted = map { $_ => 1 } @$collections; 66 + return [ 67 + grep { $wanted{ $_->{collection} // q() } } 68 + @{ $self->{list_records_by_collections} // [] } 69 + ]; 49 70 } 50 71 51 72 sub latest_event_seq { ··· 146 167 147 168 my $cache_store = LocalTestStore->new( 148 169 latest_event_seq => 1, 149 - list_accounts => [ 150 - { 170 + accounts_by_did => { 171 + $did => { 151 172 did => $did, 152 173 handle => 'alice.test', 153 174 }, 154 - ], 155 - all_records_for_did => { 156 - $did => [ 157 - { 158 - collection => 'app.bsky.feed.post', 159 - rkey => 'cached-post', 160 - cid => 'bafyreicached', 161 - value => { text => 'cached' }, 162 - }, 163 - ], 164 175 }, 176 + list_records_by_collections => [ 177 + { 178 + did => $did, 179 + collection => 'app.bsky.feed.post', 180 + rkey => 'cached-post', 181 + cid => 'bafyreicached', 182 + value => { text => 'cached' }, 183 + }, 184 + { 185 + did => $did, 186 + collection => 'app.bsky.actor.profile', 187 + rkey => 'self', 188 + cid => 'bafyreiprofile', 189 + value => { displayName => 'Ignored by local post index' }, 190 + }, 191 + ], 165 192 ); 166 193 167 194 my $cached_proxy = ATProto::PDS::ServiceProxy->new; ··· 170 197 my $third_cache_context = LocalTestContext->new($cache_store); 171 198 172 199 my $first_index = $cached_proxy->_local_post_index($first_cache_context); 173 - is($cache_store->{list_accounts_calls}, 1, 'first local post index build scans accounts once'); 174 - is($cache_store->{all_records_for_did_calls}{$did}, 1, 'first local post index build scans records once'); 200 + is($cache_store->{list_records_by_collections_calls}, 1, 'first local post index build scans relevant records once'); 201 + is($cache_store->{get_accounts_by_dids_calls}, 1, 'first local post index build fetches account metadata once'); 202 + is_deeply( 203 + $cache_store->{list_records_by_collections_args}, 204 + [qw(app.bsky.feed.post app.bsky.feed.like app.bsky.feed.repost)], 205 + 'local post index only requests feed-relevant collections', 206 + ); 207 + is_deeply($cache_store->{get_accounts_by_dids_args}, [$did], 'local post index batches account lookup by DID'); 175 208 176 209 my $second_index = $cached_proxy->_local_post_index($second_cache_context); 177 210 is($cache_store->{latest_event_seq_calls}, 2, 'subsequent requests still check the latest event seq'); 178 - is($cache_store->{list_accounts_calls}, 1, 'unchanged event seq reuses the cached global post index'); 179 - is($cache_store->{all_records_for_did_calls}{$did}, 1, 'unchanged event seq avoids rescanning records'); 211 + is($cache_store->{list_records_by_collections_calls}, 1, 'unchanged event seq reuses the cached global post index'); 212 + is($cache_store->{get_accounts_by_dids_calls}, 1, 'unchanged event seq avoids refetching account metadata'); 180 213 is($second_index, $first_index, 'unchanged event seq returns the cached index reference'); 181 214 182 215 $cache_store->{latest_event_seq} = 2; 183 216 my $third_index = $cached_proxy->_local_post_index($third_cache_context); 184 - is($cache_store->{list_accounts_calls}, 2, 'new events invalidate the cached global post index'); 185 - is($cache_store->{all_records_for_did_calls}{$did}, 2, 'new events trigger a rebuild scan'); 217 + is($cache_store->{list_records_by_collections_calls}, 2, 'new events invalidate the cached global post index'); 218 + is($cache_store->{get_accounts_by_dids_calls}, 2, 'new events trigger a fresh account batch lookup'); 186 219 isnt($third_index, $first_index, 'new events rebuild the cached index'); 187 220 188 221 done_testing;
+48
t/store-sqlite.t
··· 119 119 'repo head falls back to account metadata when repo_heads row is missing', 120 120 ); 121 121 122 + $store->put_record( 123 + did => $account->{did}, 124 + collection => 'app.bsky.feed.post', 125 + rkey => 'post-1', 126 + cid => 'bafypost1', 127 + record_bytes => q(), 128 + value => { 129 + '$type' => 'app.bsky.feed.post', 130 + text => 'hello', 131 + createdAt => '2026-03-11T19:00:00Z', 132 + }, 133 + ); 134 + $store->put_record( 135 + did => $account->{did}, 136 + collection => 'app.bsky.actor.profile', 137 + rkey => 'self', 138 + cid => 'bafyprofile1', 139 + record_bytes => q(), 140 + value => { displayName => 'Alice' }, 141 + ); 142 + $store->put_record( 143 + did => $second_account->{did}, 144 + collection => 'app.bsky.feed.like', 145 + rkey => 'like-1', 146 + cid => 'bafylike1', 147 + record_bytes => q(), 148 + value => { 149 + '$type' => 'app.bsky.feed.like', 150 + subject => { uri => 'at://did:web:pds.example.com:users:alice/app.bsky.feed.post/post-1' }, 151 + createdAt => '2026-03-11T19:01:00Z', 152 + }, 153 + ); 154 + 155 + my $feed_records = $store->list_records_by_collections([ 156 + 'app.bsky.feed.post', 157 + 'app.bsky.feed.like', 158 + ]); 159 + is( 160 + [ map { $_->{collection} } @$feed_records ], 161 + ['app.bsky.feed.post', 'app.bsky.feed.like'], 162 + 'collection-scoped record listings only return the requested feed collections', 163 + ); 164 + is( 165 + [ map { $_->{did} } @$feed_records ], 166 + [$account->{did}, $second_account->{did}], 167 + 'collection-scoped record listings preserve did ordering for batched account lookup', 168 + ); 169 + 122 170 $store->revoke_session('sess-1', revoked_at => 123); 123 171 is($store->get_session('sess-1')->{revoked_at}, 123, 'sessions can be revoked'); 124 172